From dde6afc0c01d2ebac5c5532aaf35e416c72a4245 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 22 Nov 2017 16:31:01 -0800 Subject: C++-ize backoff --- .../client_channel/lb_policy/grpclb/grpclb.cc | 25 ++-- .../resolver/dns/c_ares/dns_resolver_ares.cc | 24 ++-- .../resolver/dns/native/dns_resolver.cc | 24 ++-- src/core/ext/filters/client_channel/subchannel.cc | 84 ++++++++++--- src/core/lib/backoff/backoff.cc | 83 ++++++------- src/core/lib/backoff/backoff.h | 132 ++++++++++++--------- src/core/lib/support/alloc_new.h | 30 +++++ 7 files changed, 254 insertions(+), 148 deletions(-) create mode 100644 src/core/lib/support/alloc_new.h (limited to 'src/core') diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 5fb502e2dd..9404bd7064 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -113,6 +113,7 @@ #include "src/core/lib/slice/slice_hash_table.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/support/alloc_new.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" @@ -397,7 +398,7 @@ typedef struct glb_lb_policy { grpc_slice lb_call_status_details; /** LB call retry backoff state */ - grpc_backoff lb_call_backoff_state; + grpc_core::Backoff* lb_call_backoff; /** LB call retry timer */ grpc_timer lb_call_retry_timer; @@ -986,6 +987,7 @@ static void glb_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { } grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); grpc_subchannel_index_unref(); + gpr_free(glb_policy->lb_call_backoff); gpr_free(glb_policy); } @@ -1150,7 +1152,7 @@ static void start_picking_locked(grpc_exec_ctx* exec_ctx, } glb_policy->started_picking = true; - grpc_backoff_reset(&glb_policy->lb_call_backoff_state); + glb_policy->lb_call_backoff->Reset(); query_for_backends_locked(exec_ctx, glb_policy); } @@ -1291,8 +1293,7 @@ static void maybe_restart_lb_call(grpc_exec_ctx* exec_ctx, } else if (!glb_policy->shutting_down) { /* if we aren't shutting down, restart the LB client call after some time */ grpc_millis next_try = - grpc_backoff_step(exec_ctx, &glb_policy->lb_call_backoff_state) - .next_attempt_start_time; + glb_policy->lb_call_backoff->Step(exec_ctx).next_attempt_start_time; if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", glb_policy); @@ -1461,12 +1462,14 @@ static void lb_call_init_locked(grpc_exec_ctx* exec_ctx, lb_on_response_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_backoff_init(&glb_policy->lb_call_backoff_state, - GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER, - GRPC_GRPCLB_RECONNECT_JITTER, - GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::Backoff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) + .set_min_connect_timeout(GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000) + .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + glb_policy->lb_call_backoff = GPR_NEW(grpc_core::Backoff(backoff_options)); glb_policy->seen_initial_response = false; glb_policy->last_client_load_report_counters_were_zero = false; @@ -1573,7 +1576,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx* exec_ctx, void* arg, memset(ops, 0, sizeof(ops)); grpc_op* op = ops; if (glb_policy->lb_response_payload != nullptr) { - grpc_backoff_reset(&glb_policy->lb_call_backoff_state); + glb_policy->lb_call_backoff->Reset(); /* Received data from the LB server. Look inside * glb_policy->lb_response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 07737b19d2..041e290bb8 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -39,6 +39,7 @@ #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/json/json.h" +#include "src/core/lib/support/alloc_new.h" #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/service_config.h" @@ -89,7 +90,7 @@ typedef struct { bool have_retry_timer; grpc_timer retry_timer; /** retry backoff state */ - grpc_backoff backoff_state; + grpc_core::Backoff* backoff; /** currently resolving addresses */ grpc_lb_addresses* lb_addresses; @@ -137,7 +138,7 @@ static void dns_ares_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* resolver) { ares_dns_resolver* r = (ares_dns_resolver*)resolver; if (!r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_ares_start_resolving_locked(exec_ctx, r); } } @@ -272,8 +273,7 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx* exec_ctx, void* arg, } else { const char* msg = grpc_error_string(error); gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); - grpc_millis next_try = - grpc_backoff_step(exec_ctx, &r->backoff_state).next_attempt_start_time; + grpc_millis next_try = r->backoff->Step(exec_ctx).next_attempt_start_time; grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); @@ -307,7 +307,7 @@ static void dns_ares_next_locked(grpc_exec_ctx* exec_ctx, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_ares_start_resolving_locked(exec_ctx, r); } else { dns_ares_maybe_finish_next_locked(exec_ctx, r); @@ -353,6 +353,7 @@ static void dns_ares_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) { gpr_free(r->name_to_resolve); gpr_free(r->default_port); grpc_channel_args_destroy(exec_ctx, r->channel_args); + gpr_free(r->backoff); gpr_free(r); } @@ -381,11 +382,14 @@ static grpc_resolver* dns_ares_create(grpc_exec_ctx* exec_ctx, grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties, args->pollset_set); } - grpc_backoff_init( - &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER, - GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::Backoff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_DNS_RECONNECT_JITTER) + .set_min_connect_timeout(GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000) + .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + r->backoff = GPR_NEW(grpc_core::Backoff(backoff_options)); GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked, dns_ares_on_retry_timer_locked, r, grpc_combiner_scheduler(r->base.combiner)); diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 589c74807f..35c909e2a4 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -32,6 +32,7 @@ #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/support/alloc_new.h" #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" @@ -70,7 +71,7 @@ typedef struct { grpc_timer retry_timer; grpc_closure on_retry; /** retry backoff state */ - grpc_backoff backoff_state; + grpc_core::Backoff* backoff; /** currently resolving addresses */ grpc_resolved_addresses* addresses; @@ -113,7 +114,7 @@ static void dns_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* resolver) { dns_resolver* r = (dns_resolver*)resolver; if (!r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_start_resolving_locked(exec_ctx, r); } } @@ -126,7 +127,7 @@ static void dns_next_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* resolver, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_start_resolving_locked(exec_ctx, r); } else { dns_maybe_finish_next_locked(exec_ctx, r); @@ -170,8 +171,7 @@ static void dns_on_resolved_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_resolved_addresses_destroy(r->addresses); grpc_lb_addresses_destroy(exec_ctx, addresses); } else { - grpc_millis next_try = - grpc_backoff_step(exec_ctx, &r->backoff_state).next_attempt_start_time; + grpc_millis next_try = r->backoff->Step(exec_ctx).next_attempt_start_time; grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); @@ -233,6 +233,7 @@ static void dns_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) { gpr_free(r->name_to_resolve); gpr_free(r->default_port); grpc_channel_args_destroy(exec_ctx, r->channel_args); + gpr_free(r->backoff); gpr_free(r); } @@ -257,11 +258,14 @@ static grpc_resolver* dns_create(grpc_exec_ctx* exec_ctx, grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties, args->pollset_set); } - grpc_backoff_init( - &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER, - GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::Backoff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_DNS_RECONNECT_JITTER) + .set_min_connect_timeout(GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000) + .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + r->backoff = GPR_NEW(grpc_core::Backoff(backoff_options)); return &r->base; } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 58e294d597..a0b441a73a 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -39,6 +39,7 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/support/alloc_new.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/connectivity_state.h" @@ -72,6 +73,9 @@ typedef struct external_state_watcher { } external_state_watcher; struct grpc_subchannel { + grpc_subchannel(const grpc_core::Backoff::Options& backoff_options) + : backoff(backoff_options) {} + grpc_connector* connector; /** refcount @@ -118,8 +122,8 @@ struct grpc_subchannel { external_state_watcher root_external_state_watcher; /** backoff state */ - grpc_backoff backoff_state; - grpc_backoff_result backoff_result; + grpc_core::Backoff backoff; + grpc_core::Backoff::Result backoff_result; /** do we have an active alarm? */ bool have_alarm; @@ -283,6 +287,52 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx* exec_ctx, } } +static grpc_core::Backoff::Options extract_backoff_options( + const grpc_channel_args* args) { + int initial_backoff_ms = + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; + int min_connect_timeout_ms = + GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000; + int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + bool fixed_reconnect_backoff = false; + if (args != nullptr) { + for (size_t i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, + "grpc.testing.fixed_reconnect_backoff_ms")) { + fixed_reconnect_backoff = true; + initial_backoff_ms = min_connect_timeout_ms = max_backoff_ms = + grpc_channel_arg_get_integer(&args->args[i], + {initial_backoff_ms, 100, INT_MAX}); + } else if (0 == + strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + min_connect_timeout_ms = grpc_channel_arg_get_integer( + &args->args[i], {min_connect_timeout_ms, 100, INT_MAX}); + } else if (0 == + strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + max_backoff_ms = grpc_channel_arg_get_integer( + &args->args[i], {max_backoff_ms, 100, INT_MAX}); + } else if (0 == strcmp(args->args[i].key, + GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + initial_backoff_ms = grpc_channel_arg_get_integer( + &args->args[i], {initial_backoff_ms, 100, INT_MAX}); + } + } + } + grpc_core::Backoff::Options backoff_options; + backoff_options.set_initial_backoff(initial_backoff_ms) + .set_multiplier(fixed_reconnect_backoff + ? 1.0 + : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(fixed_reconnect_backoff ? 0.0 + : GRPC_SUBCHANNEL_RECONNECT_JITTER) + .set_min_connect_timeout(min_connect_timeout_ms) + .set_max_backoff(max_backoff_ms); + return backoff_options; +} + grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, grpc_connector* connector, const grpc_subchannel_args* args) { @@ -294,7 +344,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, } GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx); - c = (grpc_subchannel*)gpr_zalloc(sizeof(*c)); + c = GPR_NEW(grpc_subchannel(extract_backoff_options(args->args))); c->key = key; gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); c->connector = connector; @@ -336,7 +386,8 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, "subchannel"); int initial_backoff_ms = GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; - int min_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000; + int min_connect_timeout_ms = + GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000; int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; bool fixed_reconnect_backoff = false; if (c->args) { @@ -344,14 +395,14 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, if (0 == strcmp(c->args->args[i].key, "grpc.testing.fixed_reconnect_backoff_ms")) { fixed_reconnect_backoff = true; - initial_backoff_ms = min_backoff_ms = max_backoff_ms = + initial_backoff_ms = min_connect_timeout_ms = max_backoff_ms = grpc_channel_arg_get_integer(&c->args->args[i], {initial_backoff_ms, 100, INT_MAX}); } else if (0 == strcmp(c->args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; - min_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], {min_backoff_ms, 100, INT_MAX}); + min_connect_timeout_ms = grpc_channel_arg_get_integer( + &c->args->args[i], {min_connect_timeout_ms, 100, INT_MAX}); } else if (0 == strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; @@ -365,12 +416,15 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, } } } - grpc_backoff_init( - &c->backoff_state, initial_backoff_ms, - fixed_reconnect_backoff ? 1.0 - : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, - fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER, - min_backoff_ms, max_backoff_ms); + grpc_core::Backoff::Options backoff_options; + backoff_options.set_initial_backoff(initial_backoff_ms) + .set_multiplier(fixed_reconnect_backoff + ? 1.0 + : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(fixed_reconnect_backoff ? 0.0 + : GRPC_SUBCHANNEL_RECONNECT_JITTER) + .set_min_connect_timeout(min_connect_timeout_ms) + .set_max_backoff(max_backoff_ms); gpr_mu_init(&c->mu); return grpc_subchannel_index_register(exec_ctx, key, c); @@ -429,7 +483,7 @@ static void on_alarm(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { } if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - c->backoff_result = grpc_backoff_step(exec_ctx, &c->backoff_state); + c->backoff_result = c->backoff.Step(exec_ctx); continue_connect_locked(exec_ctx, c); gpr_mu_unlock(&c->mu); } else { @@ -466,7 +520,7 @@ static void maybe_start_connecting_locked(grpc_exec_ctx* exec_ctx, if (!c->backoff_begun) { c->backoff_begun = true; - c->backoff_result = grpc_backoff_begin(exec_ctx, &c->backoff_state); + c->backoff_result = c->backoff.Begin(exec_ctx); continue_connect_locked(exec_ctx, c); } else { GPR_ASSERT(!c->have_alarm); diff --git a/src/core/lib/backoff/backoff.cc b/src/core/lib/backoff/backoff.cc index dc754ddd82..7376ed6d91 100644 --- a/src/core/lib/backoff/backoff.cc +++ b/src/core/lib/backoff/backoff.cc @@ -18,63 +18,52 @@ #include "src/core/lib/backoff/backoff.h" +#include +#include + #include -void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff, - double multiplier, double jitter, - grpc_millis min_connect_timeout, - grpc_millis max_backoff) { - backoff->initial_backoff = initial_backoff; - backoff->multiplier = multiplier; - backoff->jitter = jitter; - backoff->min_connect_timeout = min_connect_timeout; - backoff->max_backoff = max_backoff; - backoff->rng_state = (uint32_t)gpr_now(GPR_CLOCK_REALTIME).tv_nsec; -} +namespace grpc_core { -grpc_backoff_result grpc_backoff_begin(grpc_exec_ctx* exec_ctx, - grpc_backoff* backoff) { - backoff->current_backoff = backoff->initial_backoff; - const grpc_millis initial_timeout = - GPR_MAX(backoff->initial_backoff, backoff->min_connect_timeout); - const grpc_millis now = grpc_exec_ctx_now(exec_ctx); - const grpc_backoff_result result = {now + initial_timeout, - now + backoff->current_backoff}; - return result; +namespace { +static double generate_uniform_random_number_between(double a, double b) { + if (a == b) return a; + if (a > b) GPR_SWAP(double, a, b); // make sure a < b + const double range = b - a; + const double zero_to_one_rand = rand() / (double)RAND_MAX; + return a + zero_to_one_rand * range; } +} // namespace -/* Generate a random number between 0 and 1. */ -static double generate_uniform_random_number(uint32_t* rng_state) { - *rng_state = (1103515245 * *rng_state + 12345) % ((uint32_t)1 << 31); - return *rng_state / (double)((uint32_t)1 << 31); +Backoff::Backoff(const Options& options) : options_(options) { + seed = (unsigned int)gpr_now(GPR_CLOCK_REALTIME).tv_nsec; } -static double generate_uniform_random_number_between(uint32_t* rng_state, - double a, double b) { - if (a == b) return a; - if (a > b) GPR_SWAP(double, a, b); // make sure a < b - const double range = b - a; - return a + generate_uniform_random_number(rng_state) * range; +Backoff::Result Backoff::Begin(grpc_exec_ctx* exec_ctx) { + current_backoff_ = options_.initial_backoff(); + const grpc_millis initial_timeout = + std::max(options_.initial_backoff(), options_.min_connect_timeout()); + const grpc_millis now = grpc_exec_ctx_now(exec_ctx); + return Backoff::Result{now + initial_timeout, now + current_backoff_}; } -grpc_backoff_result grpc_backoff_step(grpc_exec_ctx* exec_ctx, - grpc_backoff* backoff) { - backoff->current_backoff = (grpc_millis)(GPR_MIN( - backoff->current_backoff * backoff->multiplier, backoff->max_backoff)); +Backoff::Result Backoff::Step(grpc_exec_ctx* exec_ctx) { + current_backoff_ = + (grpc_millis)(std::min(current_backoff_ * options_.multiplier(), + (double)options_.max_backoff())); const double jitter = generate_uniform_random_number_between( - &backoff->rng_state, -backoff->jitter * backoff->current_backoff, - backoff->jitter * backoff->current_backoff); - const grpc_millis current_timeout = - GPR_MAX((grpc_millis)(backoff->current_backoff + jitter), - backoff->min_connect_timeout); - const grpc_millis next_timeout = GPR_MIN( - (grpc_millis)(backoff->current_backoff + jitter), backoff->max_backoff); + -options_.jitter() * current_backoff_, + options_.jitter() * current_backoff_); + const grpc_millis current_timeout = std::max( + (grpc_millis)(current_backoff_ + jitter), options_.min_connect_timeout()); + const grpc_millis next_timeout = std::min( + (grpc_millis)(current_backoff_ + jitter), options_.max_backoff()); const grpc_millis now = grpc_exec_ctx_now(exec_ctx); - const grpc_backoff_result result = {now + current_timeout, - now + next_timeout}; - return result; + return Backoff::Result{now + current_timeout, now + next_timeout}; } -void grpc_backoff_reset(grpc_backoff* backoff) { - backoff->current_backoff = backoff->initial_backoff; -} +void Backoff::Reset() { current_backoff_ = options_.initial_backoff(); } + +void Backoff::SetRandomSeed(uint32_t seed) { srand(seed); } + +} // namespace grpc_core diff --git a/src/core/lib/backoff/backoff.h b/src/core/lib/backoff/backoff.h index 1067281403..ab644b979a 100644 --- a/src/core/lib/backoff/backoff.h +++ b/src/core/lib/backoff/backoff.h @@ -21,63 +21,85 @@ #include "src/core/lib/iomgr/exec_ctx.h" -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct { - /// const: how long to wait after the first failure before retrying - grpc_millis initial_backoff; - - /// const: factor with which to multiply backoff after a failed retry - double multiplier; - - /// const: amount to randomize backoffs - double jitter; - - /// const: minimum time between retries - grpc_millis min_connect_timeout; - - /// const: maximum time between retries - grpc_millis max_backoff; - +namespace grpc_core { + +class Backoff { + public: + class Options; + struct Result; + + /// Initialize backoff machinery - does not need to be destroyed + explicit Backoff(const Options& options); + + /// Begin retry loop: returns the deadlines to be used for the current attempt + /// and the subsequent retry, if any. + Result Begin(grpc_exec_ctx* exec_ctx); + /// Step a retry loop: returns the deadlines to be used for the current + /// attempt and the subsequent retry, if any. + Result Step(grpc_exec_ctx* exec_ctx); + /// Reset the backoff, so the next grpc_backoff_step will be a + /// grpc_backoff_begin. + void Reset(); + + void SetRandomSeed(unsigned int seed); + + class Options { + public: + Options& set_initial_backoff(grpc_millis initial_backoff) { + initial_backoff_ = initial_backoff; + return *this; + } + Options& set_multiplier(double multiplier) { + multiplier_ = multiplier; + return *this; + } + Options& set_jitter(double jitter) { + jitter_ = jitter; + return *this; + } + Options& set_min_connect_timeout(grpc_millis min_connect_timeout) { + min_connect_timeout_ = min_connect_timeout; + return *this; + } + Options& set_max_backoff(grpc_millis max_backoff) { + max_backoff_ = max_backoff; + return *this; + } + /// how long to wait after the first failure before retrying + grpc_millis initial_backoff() const { return initial_backoff_; } + /// factor with which to multiply backoff after a failed retry + double multiplier() const { return multiplier_; } + /// amount to randomize backoffs + double jitter() const { return jitter_; } + /// minimum time between retries + grpc_millis min_connect_timeout() const { return min_connect_timeout_; } + /// maximum time between retries + grpc_millis max_backoff() const { return max_backoff_; } + + private: + grpc_millis initial_backoff_; + double multiplier_; + double jitter_; + grpc_millis min_connect_timeout_; + grpc_millis max_backoff_; + }; // class Options + + struct Result { + /// Deadline to be used for the current attempt. + grpc_millis current_deadline; + /// Deadline to be used for the next attempt, following the backoff + /// strategy. + grpc_millis next_attempt_start_time; + }; + + private: + const Options options_; /// current delay before retries - grpc_millis current_backoff; - - /// random number generator - uint32_t rng_state; -} grpc_backoff; - -typedef struct { - /// Deadline to be used for the current attempt. - grpc_millis current_deadline; - - /// Deadline to be used for the next attempt, following the backoff strategy. - grpc_millis next_attempt_start_time; -} grpc_backoff_result; - -/// Initialize backoff machinery - does not need to be destroyed -void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff, - double multiplier, double jitter, - grpc_millis min_connect_timeout, - grpc_millis max_backoff); - -/// Begin retry loop: returns the deadlines to be used for the current attempt -/// and the subsequent retry, if any. -grpc_backoff_result grpc_backoff_begin(grpc_exec_ctx* exec_ctx, - grpc_backoff* backoff); - -/// Step a retry loop: returns the deadlines to be used for the current attempt -/// and the subsequent retry, if any. -grpc_backoff_result grpc_backoff_step(grpc_exec_ctx* exec_ctx, - grpc_backoff* backoff); + grpc_millis current_backoff_; -/// Reset the backoff, so the next grpc_backoff_step will be a -/// grpc_backoff_begin. -void grpc_backoff_reset(grpc_backoff* backoff); + unsigned int seed; +}; -#ifdef __cplusplus -} -#endif +} // namespace grpc_core #endif /* GRPC_CORE_LIB_BACKOFF_BACKOFF_H */ diff --git a/src/core/lib/support/alloc_new.h b/src/core/lib/support/alloc_new.h new file mode 100644 index 0000000000..314c114cd9 --- /dev/null +++ b/src/core/lib/support/alloc_new.h @@ -0,0 +1,30 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_SUPPORT_ALLOC_NEW_H +#define GRPC_SUPPORT_ALLOC_NEW_H + +#include + +#define GPR_NEW(expr) new (gpr_zalloc) expr + +inline void* operator new(size_t sz, void* (*alloc_fn)(size_t)) { + return alloc_fn(sz); +} + +#endif /* GRPC_SUPPORT_ALLOC_NEW_H */ -- cgit v1.2.3 From d27e2422ccf0e580f2f85a91c2f3919076693257 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Mon, 27 Nov 2017 12:53:14 -0800 Subject: Use ManualConstructor --- .../client_channel/lb_policy/grpclb/grpclb.cc | 8 ++-- .../resolver/dns/c_ares/dns_resolver_ares.cc | 7 ++- .../resolver/dns/native/dns_resolver.cc | 7 ++- src/core/ext/filters/client_channel/subchannel.cc | 55 +++------------------- src/core/lib/support/alloc_new.h | 30 ------------ 5 files changed, 16 insertions(+), 91 deletions(-) delete mode 100644 src/core/lib/support/alloc_new.h (limited to 'src/core') diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 9404bd7064..cc55925758 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -113,7 +113,7 @@ #include "src/core/lib/slice/slice_hash_table.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" -#include "src/core/lib/support/alloc_new.h" +#include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" @@ -398,7 +398,7 @@ typedef struct glb_lb_policy { grpc_slice lb_call_status_details; /** LB call retry backoff state */ - grpc_core::Backoff* lb_call_backoff; + grpc_core::ManualConstructor lb_call_backoff; /** LB call retry timer */ grpc_timer lb_call_retry_timer; @@ -987,7 +987,6 @@ static void glb_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { } grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); grpc_subchannel_index_unref(); - gpr_free(glb_policy->lb_call_backoff); gpr_free(glb_policy); } @@ -1469,7 +1468,8 @@ static void lb_call_init_locked(grpc_exec_ctx* exec_ctx, .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) .set_min_connect_timeout(GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000) .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); - glb_policy->lb_call_backoff = GPR_NEW(grpc_core::Backoff(backoff_options)); + + glb_policy->lb_call_backoff.Init(backoff_options); glb_policy->seen_initial_response = false; glb_policy->last_client_load_report_counters_were_zero = false; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 041e290bb8..ff6689b736 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -39,8 +39,8 @@ #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/json/json.h" -#include "src/core/lib/support/alloc_new.h" #include "src/core/lib/support/env.h" +#include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/service_config.h" @@ -90,7 +90,7 @@ typedef struct { bool have_retry_timer; grpc_timer retry_timer; /** retry backoff state */ - grpc_core::Backoff* backoff; + grpc_core::ManualConstructor backoff; /** currently resolving addresses */ grpc_lb_addresses* lb_addresses; @@ -353,7 +353,6 @@ static void dns_ares_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) { gpr_free(r->name_to_resolve); gpr_free(r->default_port); grpc_channel_args_destroy(exec_ctx, r->channel_args); - gpr_free(r->backoff); gpr_free(r); } @@ -389,7 +388,7 @@ static grpc_resolver* dns_ares_create(grpc_exec_ctx* exec_ctx, .set_jitter(GRPC_DNS_RECONNECT_JITTER) .set_min_connect_timeout(GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000) .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); - r->backoff = GPR_NEW(grpc_core::Backoff(backoff_options)); + r->backoff.Init(grpc_core::Backoff(backoff_options)); GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked, dns_ares_on_retry_timer_locked, r, grpc_combiner_scheduler(r->base.combiner)); diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 35c909e2a4..81f292f4d9 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -32,8 +32,8 @@ #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" -#include "src/core/lib/support/alloc_new.h" #include "src/core/lib/support/env.h" +#include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/support/string.h" #define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1 @@ -71,7 +71,7 @@ typedef struct { grpc_timer retry_timer; grpc_closure on_retry; /** retry backoff state */ - grpc_core::Backoff* backoff; + grpc_core::ManualConstructor backoff; /** currently resolving addresses */ grpc_resolved_addresses* addresses; @@ -233,7 +233,6 @@ static void dns_destroy(grpc_exec_ctx* exec_ctx, grpc_resolver* gr) { gpr_free(r->name_to_resolve); gpr_free(r->default_port); grpc_channel_args_destroy(exec_ctx, r->channel_args); - gpr_free(r->backoff); gpr_free(r); } @@ -265,7 +264,7 @@ static grpc_resolver* dns_create(grpc_exec_ctx* exec_ctx, .set_jitter(GRPC_DNS_RECONNECT_JITTER) .set_min_connect_timeout(GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000) .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); - r->backoff = GPR_NEW(grpc_core::Backoff(backoff_options)); + r->backoff.Init(grpc_core::Backoff(backoff_options)); return &r->base; } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index a0b441a73a..945c237d2c 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -39,7 +39,7 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/alloc_new.h" +#include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/connectivity_state.h" @@ -73,9 +73,6 @@ typedef struct external_state_watcher { } external_state_watcher; struct grpc_subchannel { - grpc_subchannel(const grpc_core::Backoff::Options& backoff_options) - : backoff(backoff_options) {} - grpc_connector* connector; /** refcount @@ -122,7 +119,7 @@ struct grpc_subchannel { external_state_watcher root_external_state_watcher; /** backoff state */ - grpc_core::Backoff backoff; + grpc_core::ManualConstructor backoff; grpc_core::Backoff::Result backoff_result; /** do we have an active alarm? */ @@ -344,7 +341,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, } GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx); - c = GPR_NEW(grpc_subchannel(extract_backoff_options(args->args))); + c = (grpc_subchannel*)gpr_zalloc(sizeof(*c)); c->key = key; gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); c->connector = connector; @@ -384,47 +381,7 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, grpc_schedule_on_exec_ctx); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); - int initial_backoff_ms = - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; - int min_connect_timeout_ms = - GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000; - int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; - bool fixed_reconnect_backoff = false; - if (c->args) { - for (size_t i = 0; i < c->args->num_args; i++) { - if (0 == strcmp(c->args->args[i].key, - "grpc.testing.fixed_reconnect_backoff_ms")) { - fixed_reconnect_backoff = true; - initial_backoff_ms = min_connect_timeout_ms = max_backoff_ms = - grpc_channel_arg_get_integer(&c->args->args[i], - {initial_backoff_ms, 100, INT_MAX}); - } else if (0 == strcmp(c->args->args[i].key, - GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { - fixed_reconnect_backoff = false; - min_connect_timeout_ms = grpc_channel_arg_get_integer( - &c->args->args[i], {min_connect_timeout_ms, 100, INT_MAX}); - } else if (0 == strcmp(c->args->args[i].key, - GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { - fixed_reconnect_backoff = false; - max_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], {max_backoff_ms, 100, INT_MAX}); - } else if (0 == strcmp(c->args->args[i].key, - GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { - fixed_reconnect_backoff = false; - initial_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], {initial_backoff_ms, 100, INT_MAX}); - } - } - } - grpc_core::Backoff::Options backoff_options; - backoff_options.set_initial_backoff(initial_backoff_ms) - .set_multiplier(fixed_reconnect_backoff - ? 1.0 - : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) - .set_jitter(fixed_reconnect_backoff ? 0.0 - : GRPC_SUBCHANNEL_RECONNECT_JITTER) - .set_min_connect_timeout(min_connect_timeout_ms) - .set_max_backoff(max_backoff_ms); + c->backoff.Init(extract_backoff_options(args->args)); gpr_mu_init(&c->mu); return grpc_subchannel_index_register(exec_ctx, key, c); @@ -483,7 +440,7 @@ static void on_alarm(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { } if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - c->backoff_result = c->backoff.Step(exec_ctx); + c->backoff_result = c->backoff->Step(exec_ctx); continue_connect_locked(exec_ctx, c); gpr_mu_unlock(&c->mu); } else { @@ -520,7 +477,7 @@ static void maybe_start_connecting_locked(grpc_exec_ctx* exec_ctx, if (!c->backoff_begun) { c->backoff_begun = true; - c->backoff_result = c->backoff.Begin(exec_ctx); + c->backoff_result = c->backoff->Begin(exec_ctx); continue_connect_locked(exec_ctx, c); } else { GPR_ASSERT(!c->have_alarm); diff --git a/src/core/lib/support/alloc_new.h b/src/core/lib/support/alloc_new.h deleted file mode 100644 index 314c114cd9..0000000000 --- a/src/core/lib/support/alloc_new.h +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_SUPPORT_ALLOC_NEW_H -#define GRPC_SUPPORT_ALLOC_NEW_H - -#include - -#define GPR_NEW(expr) new (gpr_zalloc) expr - -inline void* operator new(size_t sz, void* (*alloc_fn)(size_t)) { - return alloc_fn(sz); -} - -#endif /* GRPC_SUPPORT_ALLOC_NEW_H */ -- cgit v1.2.3 From cfac788ed042f6b29c4ed83f8a8e90bded41f0d9 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Mon, 27 Nov 2017 12:59:37 -0800 Subject: Backoff class docstring --- src/core/lib/backoff/backoff.h | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/core') diff --git a/src/core/lib/backoff/backoff.h b/src/core/lib/backoff/backoff.h index ab644b979a..cc2a9914ec 100644 --- a/src/core/lib/backoff/backoff.h +++ b/src/core/lib/backoff/backoff.h @@ -23,6 +23,8 @@ namespace grpc_core { +/// Implementation of the backoff mechanism described in +/// doc/connection-backoff.md class Backoff { public: class Options; -- cgit v1.2.3 From d3f6eae6694189bc9547acc6a95f9c94add4508b Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 28 Nov 2017 13:24:45 -0800 Subject: Restored our own RNG impl --- src/core/lib/backoff/backoff.cc | 21 ++++++++++++++------- src/core/lib/backoff/backoff.h | 3 +-- 2 files changed, 15 insertions(+), 9 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/backoff/backoff.cc b/src/core/lib/backoff/backoff.cc index 7376ed6d91..a5128a7e6e 100644 --- a/src/core/lib/backoff/backoff.cc +++ b/src/core/lib/backoff/backoff.cc @@ -19,24 +19,31 @@ #include "src/core/lib/backoff/backoff.h" #include -#include #include namespace grpc_core { namespace { -static double generate_uniform_random_number_between(double a, double b) { + +/* Generate a random number between 0 and 1. We roll our own RNG because seeding + * rand() modifies a global variable we have no control over. */ +double generate_uniform_random_number(uint32_t* rng_state) { + *rng_state = (1103515245 * *rng_state + 12345) % ((uint32_t)1 << 31); + return *rng_state / (double)((uint32_t)1 << 31); +} + +double generate_uniform_random_number_between(uint32_t* rng_state, double a, + double b) { if (a == b) return a; if (a > b) GPR_SWAP(double, a, b); // make sure a < b const double range = b - a; - const double zero_to_one_rand = rand() / (double)RAND_MAX; - return a + zero_to_one_rand * range; + return a + generate_uniform_random_number(rng_state) * range; } } // namespace Backoff::Backoff(const Options& options) : options_(options) { - seed = (unsigned int)gpr_now(GPR_CLOCK_REALTIME).tv_nsec; + rng_state_ = (unsigned int)gpr_now(GPR_CLOCK_REALTIME).tv_nsec; } Backoff::Result Backoff::Begin(grpc_exec_ctx* exec_ctx) { @@ -52,7 +59,7 @@ Backoff::Result Backoff::Step(grpc_exec_ctx* exec_ctx) { (grpc_millis)(std::min(current_backoff_ * options_.multiplier(), (double)options_.max_backoff())); const double jitter = generate_uniform_random_number_between( - -options_.jitter() * current_backoff_, + &rng_state_, -options_.jitter() * current_backoff_, options_.jitter() * current_backoff_); const grpc_millis current_timeout = std::max( (grpc_millis)(current_backoff_ + jitter), options_.min_connect_timeout()); @@ -64,6 +71,6 @@ Backoff::Result Backoff::Step(grpc_exec_ctx* exec_ctx) { void Backoff::Reset() { current_backoff_ = options_.initial_backoff(); } -void Backoff::SetRandomSeed(uint32_t seed) { srand(seed); } +void Backoff::SetRandomSeed(uint32_t seed) { rng_state_ = seed; } } // namespace grpc_core diff --git a/src/core/lib/backoff/backoff.h b/src/core/lib/backoff/backoff.h index cc2a9914ec..3e5241c226 100644 --- a/src/core/lib/backoff/backoff.h +++ b/src/core/lib/backoff/backoff.h @@ -98,8 +98,7 @@ class Backoff { const Options options_; /// current delay before retries grpc_millis current_backoff_; - - unsigned int seed; + uint32_t rng_state_; }; } // namespace grpc_core -- cgit v1.2.3 From 0f91e513d9dc9bee529701ba254933eb7be07b38 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Mon, 4 Dec 2017 16:12:54 -0800 Subject: Cleaned up API. Backoff now returns a single value: the time of the next retry --- .../client_channel/lb_policy/grpclb/grpclb.cc | 7 +- .../resolver/dns/c_ares/dns_resolver_ares.cc | 8 +- .../resolver/dns/native/dns_resolver.cc | 8 +- src/core/ext/filters/client_channel/subchannel.cc | 32 +++-- src/core/lib/backoff/backoff.cc | 27 ++-- src/core/lib/backoff/backoff.h | 25 ++-- test/core/backoff/backoff_test.cc | 144 +++++++++------------ 7 files changed, 112 insertions(+), 139 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index cc55925758..3c4e7d0270 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -398,7 +398,7 @@ typedef struct glb_lb_policy { grpc_slice lb_call_status_details; /** LB call retry backoff state */ - grpc_core::ManualConstructor lb_call_backoff; + grpc_core::ManualConstructor lb_call_backoff; /** LB call retry timer */ grpc_timer lb_call_retry_timer; @@ -1291,8 +1291,7 @@ static void maybe_restart_lb_call(grpc_exec_ctx* exec_ctx, glb_policy->updating_lb_call = false; } else if (!glb_policy->shutting_down) { /* if we aren't shutting down, restart the LB client call after some time */ - grpc_millis next_try = - glb_policy->lb_call_backoff->Step(exec_ctx).next_attempt_start_time; + grpc_millis next_try = glb_policy->lb_call_backoff->Step(exec_ctx); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", glb_policy); @@ -1461,7 +1460,7 @@ static void lb_call_init_locked(grpc_exec_ctx* exec_ctx, lb_on_response_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_core::Backoff::Options backoff_options; + grpc_core::BackOff::Options backoff_options; backoff_options .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index ff6689b736..c3d0e87a83 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -90,7 +90,7 @@ typedef struct { bool have_retry_timer; grpc_timer retry_timer; /** retry backoff state */ - grpc_core::ManualConstructor backoff; + grpc_core::ManualConstructor backoff; /** currently resolving addresses */ grpc_lb_addresses* lb_addresses; @@ -273,7 +273,7 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx* exec_ctx, void* arg, } else { const char* msg = grpc_error_string(error); gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); - grpc_millis next_try = r->backoff->Step(exec_ctx).next_attempt_start_time; + grpc_millis next_try = r->backoff->Step(exec_ctx); grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); @@ -381,14 +381,14 @@ static grpc_resolver* dns_ares_create(grpc_exec_ctx* exec_ctx, grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties, args->pollset_set); } - grpc_core::Backoff::Options backoff_options; + grpc_core::BackOff::Options backoff_options; backoff_options .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) .set_jitter(GRPC_DNS_RECONNECT_JITTER) .set_min_connect_timeout(GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000) .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); - r->backoff.Init(grpc_core::Backoff(backoff_options)); + r->backoff.Init(grpc_core::BackOff(backoff_options)); GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked, dns_ares_on_retry_timer_locked, r, grpc_combiner_scheduler(r->base.combiner)); diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 81f292f4d9..e1f5ca87d4 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -71,7 +71,7 @@ typedef struct { grpc_timer retry_timer; grpc_closure on_retry; /** retry backoff state */ - grpc_core::ManualConstructor backoff; + grpc_core::ManualConstructor backoff; /** currently resolving addresses */ grpc_resolved_addresses* addresses; @@ -171,7 +171,7 @@ static void dns_on_resolved_locked(grpc_exec_ctx* exec_ctx, void* arg, grpc_resolved_addresses_destroy(r->addresses); grpc_lb_addresses_destroy(exec_ctx, addresses); } else { - grpc_millis next_try = r->backoff->Step(exec_ctx).next_attempt_start_time; + grpc_millis next_try = r->backoff->Step(exec_ctx); grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); @@ -257,14 +257,14 @@ static grpc_resolver* dns_create(grpc_exec_ctx* exec_ctx, grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties, args->pollset_set); } - grpc_core::Backoff::Options backoff_options; + grpc_core::BackOff::Options backoff_options; backoff_options .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) .set_jitter(GRPC_DNS_RECONNECT_JITTER) .set_min_connect_timeout(GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000) .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); - r->backoff.Init(grpc_core::Backoff(backoff_options)); + r->backoff.Init(grpc_core::BackOff(backoff_options)); return &r->base; } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 945c237d2c..dbf66ae71a 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -20,7 +20,9 @@ #include #include -#include + +#include +#include #include #include @@ -49,7 +51,7 @@ #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6 -#define GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS 20 +#define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 @@ -119,8 +121,8 @@ struct grpc_subchannel { external_state_watcher root_external_state_watcher; /** backoff state */ - grpc_core::ManualConstructor backoff; - grpc_core::Backoff::Result backoff_result; + grpc_core::ManualConstructor backoff; + grpc_millis next_attempt_deadline; /** do we have an active alarm? */ bool have_alarm; @@ -284,12 +286,12 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx* exec_ctx, } } -static grpc_core::Backoff::Options extract_backoff_options( +static grpc_core::BackOff::Options extract_backoff_options( const grpc_channel_args* args) { int initial_backoff_ms = GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; int min_connect_timeout_ms = - GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000; + GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000; int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; bool fixed_reconnect_backoff = false; if (args != nullptr) { @@ -318,7 +320,7 @@ static grpc_core::Backoff::Options extract_backoff_options( } } } - grpc_core::Backoff::Options backoff_options; + grpc_core::BackOff::Options backoff_options; backoff_options.set_initial_backoff(initial_backoff_ms) .set_multiplier(fixed_reconnect_backoff ? 1.0 @@ -392,7 +394,7 @@ static void continue_connect_locked(grpc_exec_ctx* exec_ctx, grpc_connect_in_args args; args.interested_parties = c->pollset_set; - args.deadline = c->backoff_result.current_deadline; + args.deadline = c->next_attempt_deadline; args.channel_args = c->args; grpc_connectivity_state_set(exec_ctx, &c->state_tracker, @@ -440,7 +442,11 @@ static void on_alarm(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { } if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - c->backoff_result = c->backoff->Step(exec_ctx); + const grpc_millis min_deadline = + (GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000) + + grpc_exec_ctx_now(exec_ctx); + c->next_attempt_deadline = + std::max(c->backoff->Step(exec_ctx), min_deadline); continue_connect_locked(exec_ctx, c); gpr_mu_unlock(&c->mu); } else { @@ -477,21 +483,21 @@ static void maybe_start_connecting_locked(grpc_exec_ctx* exec_ctx, if (!c->backoff_begun) { c->backoff_begun = true; - c->backoff_result = c->backoff->Begin(exec_ctx); + c->next_attempt_deadline = c->backoff->Begin(exec_ctx); continue_connect_locked(exec_ctx, c); } else { GPR_ASSERT(!c->have_alarm); c->have_alarm = true; const grpc_millis time_til_next = - c->backoff_result.next_attempt_start_time - grpc_exec_ctx_now(exec_ctx); + c->next_attempt_deadline - grpc_exec_ctx_now(exec_ctx); if (time_til_next <= 0) { gpr_log(GPR_INFO, "Retry immediately"); } else { gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next); } GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &c->alarm, - c->backoff_result.next_attempt_start_time, &c->on_alarm); + grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt_deadline, + &c->on_alarm); } } diff --git a/src/core/lib/backoff/backoff.cc b/src/core/lib/backoff/backoff.cc index a5128a7e6e..5553e6b3f3 100644 --- a/src/core/lib/backoff/backoff.cc +++ b/src/core/lib/backoff/backoff.cc @@ -29,8 +29,9 @@ namespace { /* Generate a random number between 0 and 1. We roll our own RNG because seeding * rand() modifies a global variable we have no control over. */ double generate_uniform_random_number(uint32_t* rng_state) { - *rng_state = (1103515245 * *rng_state + 12345) % ((uint32_t)1 << 31); - return *rng_state / (double)((uint32_t)1 << 31); + constexpr uint32_t two_raise_31 = uint32_t(1) << 31; + *rng_state = (1103515245 * *rng_state + 12345) % two_raise_31; + return *rng_state / static_cast(two_raise_31); } double generate_uniform_random_number_between(uint32_t* rng_state, double a, @@ -42,35 +43,29 @@ double generate_uniform_random_number_between(uint32_t* rng_state, double a, } } // namespace -Backoff::Backoff(const Options& options) : options_(options) { - rng_state_ = (unsigned int)gpr_now(GPR_CLOCK_REALTIME).tv_nsec; +BackOff::BackOff(const Options& options) : options_(options) { + rng_state_ = static_cast(gpr_now(GPR_CLOCK_REALTIME).tv_nsec); } -Backoff::Result Backoff::Begin(grpc_exec_ctx* exec_ctx) { +grpc_millis BackOff::Begin(grpc_exec_ctx* exec_ctx) { current_backoff_ = options_.initial_backoff(); - const grpc_millis initial_timeout = - std::max(options_.initial_backoff(), options_.min_connect_timeout()); - const grpc_millis now = grpc_exec_ctx_now(exec_ctx); - return Backoff::Result{now + initial_timeout, now + current_backoff_}; + return current_backoff_ + grpc_exec_ctx_now(exec_ctx); } -Backoff::Result Backoff::Step(grpc_exec_ctx* exec_ctx) { +grpc_millis BackOff::Step(grpc_exec_ctx* exec_ctx) { current_backoff_ = (grpc_millis)(std::min(current_backoff_ * options_.multiplier(), (double)options_.max_backoff())); const double jitter = generate_uniform_random_number_between( &rng_state_, -options_.jitter() * current_backoff_, options_.jitter() * current_backoff_); - const grpc_millis current_timeout = std::max( - (grpc_millis)(current_backoff_ + jitter), options_.min_connect_timeout()); const grpc_millis next_timeout = std::min( (grpc_millis)(current_backoff_ + jitter), options_.max_backoff()); - const grpc_millis now = grpc_exec_ctx_now(exec_ctx); - return Backoff::Result{now + current_timeout, now + next_timeout}; + return next_timeout + grpc_exec_ctx_now(exec_ctx); } -void Backoff::Reset() { current_backoff_ = options_.initial_backoff(); } +void BackOff::Reset() { current_backoff_ = options_.initial_backoff(); } -void Backoff::SetRandomSeed(uint32_t seed) { rng_state_ = seed; } +void BackOff::SetRandomSeed(uint32_t seed) { rng_state_ = seed; } } // namespace grpc_core diff --git a/src/core/lib/backoff/backoff.h b/src/core/lib/backoff/backoff.h index 3e5241c226..5ba05e1d75 100644 --- a/src/core/lib/backoff/backoff.h +++ b/src/core/lib/backoff/backoff.h @@ -25,20 +25,19 @@ namespace grpc_core { /// Implementation of the backoff mechanism described in /// doc/connection-backoff.md -class Backoff { +class BackOff { public: class Options; - struct Result; /// Initialize backoff machinery - does not need to be destroyed - explicit Backoff(const Options& options); + explicit BackOff(const Options& options); - /// Begin retry loop: returns the deadlines to be used for the current attempt - /// and the subsequent retry, if any. - Result Begin(grpc_exec_ctx* exec_ctx); - /// Step a retry loop: returns the deadlines to be used for the current - /// attempt and the subsequent retry, if any. - Result Step(grpc_exec_ctx* exec_ctx); + /// Begin retry loop: returns the deadline to be used for the next attempt, + /// following the backoff / strategy. + grpc_millis Begin(grpc_exec_ctx* exec_ctx); + /// Step a retry loop: returns returns the deadline to be used for the next + /// attempt, / following the backoff / strategy. + grpc_millis Step(grpc_exec_ctx* exec_ctx); /// Reset the backoff, so the next grpc_backoff_step will be a /// grpc_backoff_begin. void Reset(); @@ -86,14 +85,6 @@ class Backoff { grpc_millis max_backoff_; }; // class Options - struct Result { - /// Deadline to be used for the current attempt. - grpc_millis current_deadline; - /// Deadline to be used for the next attempt, following the backoff - /// strategy. - grpc_millis next_attempt_start_time; - }; - private: const Options options_; /// current delay before retries diff --git a/test/core/backoff/backoff_test.cc b/test/core/backoff/backoff_test.cc index 478d3b325d..e0b72cfce0 100644 --- a/test/core/backoff/backoff_test.cc +++ b/test/core/backoff/backoff_test.cc @@ -18,6 +18,8 @@ #include "src/core/lib/backoff/backoff.h" +#include + #include #include @@ -28,7 +30,7 @@ namespace grpc { namespace testing { namespace { -using grpc_core::Backoff; +using grpc_core::BackOff; TEST(BackOffTest, ConstantBackOff) { const grpc_millis initial_backoff = 200; @@ -36,29 +38,23 @@ TEST(BackOffTest, ConstantBackOff) { const double jitter = 0.0; const grpc_millis min_connect_timeout = 100; const grpc_millis max_backoff = 1000; - Backoff::Options options; + BackOff::Options options; options.set_initial_backoff(initial_backoff) .set_multiplier(multiplier) .set_jitter(jitter) .set_min_connect_timeout(min_connect_timeout) .set_max_backoff(max_backoff); - Backoff backoff(options); + BackOff backoff(options); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - Backoff::Result next_deadlines = backoff.Begin(&exec_ctx); - GPR_ASSERT(next_deadlines.current_deadline - grpc_exec_ctx_now(&exec_ctx) == - initial_backoff); - GPR_ASSERT(next_deadlines.next_attempt_start_time - - grpc_exec_ctx_now(&exec_ctx) == - initial_backoff); + grpc_millis next_attempt_start_time = backoff.Begin(&exec_ctx); + EXPECT_EQ(next_attempt_start_time - grpc_exec_ctx_now(&exec_ctx), + initial_backoff); for (int i = 0; i < 10000; i++) { - next_deadlines = backoff.Step(&exec_ctx); - GPR_ASSERT(next_deadlines.current_deadline - grpc_exec_ctx_now(&exec_ctx) == - initial_backoff); - GPR_ASSERT(next_deadlines.next_attempt_start_time - - grpc_exec_ctx_now(&exec_ctx) == - initial_backoff); - exec_ctx.now = next_deadlines.current_deadline; + next_attempt_start_time = backoff.Step(&exec_ctx); + EXPECT_EQ(next_attempt_start_time - grpc_exec_ctx_now(&exec_ctx), + initial_backoff); + exec_ctx.now = next_attempt_start_time; } grpc_exec_ctx_finish(&exec_ctx); } @@ -69,23 +65,16 @@ TEST(BackOffTest, MinConnect) { const double jitter = 0.0; const grpc_millis min_connect_timeout = 200; const grpc_millis max_backoff = 1000; - Backoff::Options options; + BackOff::Options options; options.set_initial_backoff(initial_backoff) .set_multiplier(multiplier) .set_jitter(jitter) .set_min_connect_timeout(min_connect_timeout) .set_max_backoff(max_backoff); - Backoff backoff(options); + BackOff backoff(options); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - Backoff::Result next = backoff.Begin(&exec_ctx); - // Because the min_connect_timeout > initial_backoff, current_deadline is used - // as the deadline for the current attempt. - GPR_ASSERT(next.current_deadline - grpc_exec_ctx_now(&exec_ctx) == - min_connect_timeout); - // ... while, if the current attempt fails, the next one will happen after - // initial_backoff. - GPR_ASSERT(next.next_attempt_start_time - grpc_exec_ctx_now(&exec_ctx) == - initial_backoff); + grpc_millis next = backoff.Begin(&exec_ctx); + EXPECT_EQ(next - grpc_exec_ctx_now(&exec_ctx), initial_backoff); grpc_exec_ctx_finish(&exec_ctx); } @@ -95,57 +84,55 @@ TEST(BackOffTest, NoJitterBackOff) { const double jitter = 0.0; const grpc_millis min_connect_timeout = 1; const grpc_millis max_backoff = 513; - Backoff::Options options; + BackOff::Options options; options.set_initial_backoff(initial_backoff) .set_multiplier(multiplier) .set_jitter(jitter) .set_min_connect_timeout(min_connect_timeout) .set_max_backoff(max_backoff); - Backoff backoff(options); + BackOff backoff(options); // x_1 = 2 // x_n = 2**i + x_{i-1} ( = 2**(n+1) - 2 ) grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; exec_ctx.now = 0; exec_ctx.now_is_valid = true; - Backoff::Result next_deadlines = backoff.Begin(&exec_ctx); - GPR_ASSERT(next_deadlines.current_deadline == - next_deadlines.next_attempt_start_time); - GPR_ASSERT(next_deadlines.current_deadline == 2); - exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = backoff.Step(&exec_ctx); - GPR_ASSERT(next_deadlines.current_deadline == 6); - exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = backoff.Step(&exec_ctx); - GPR_ASSERT(next_deadlines.current_deadline == 14); - exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = backoff.Step(&exec_ctx); - GPR_ASSERT(next_deadlines.current_deadline == 30); - exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = backoff.Step(&exec_ctx); - GPR_ASSERT(next_deadlines.current_deadline == 62); - exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = backoff.Step(&exec_ctx); - GPR_ASSERT(next_deadlines.current_deadline == 126); - exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = backoff.Step(&exec_ctx); - GPR_ASSERT(next_deadlines.current_deadline == 254); - exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = backoff.Step(&exec_ctx); - GPR_ASSERT(next_deadlines.current_deadline == 510); - exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = backoff.Step(&exec_ctx); - GPR_ASSERT(next_deadlines.current_deadline == 1022); - exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = backoff.Step(&exec_ctx); + grpc_millis next = backoff.Begin(&exec_ctx); + EXPECT_EQ(next, 2); + exec_ctx.now = next; + next = backoff.Step(&exec_ctx); + EXPECT_EQ(next, 6); + exec_ctx.now = next; + next = backoff.Step(&exec_ctx); + EXPECT_EQ(next, 14); + exec_ctx.now = next; + next = backoff.Step(&exec_ctx); + EXPECT_EQ(next, 30); + exec_ctx.now = next; + next = backoff.Step(&exec_ctx); + EXPECT_EQ(next, 62); + exec_ctx.now = next; + next = backoff.Step(&exec_ctx); + EXPECT_EQ(next, 126); + exec_ctx.now = next; + next = backoff.Step(&exec_ctx); + EXPECT_EQ(next, 254); + exec_ctx.now = next; + next = backoff.Step(&exec_ctx); + EXPECT_EQ(next, 510); + exec_ctx.now = next; + next = backoff.Step(&exec_ctx); + EXPECT_EQ(next, 1022); + exec_ctx.now = next; + next = backoff.Step(&exec_ctx); // Hit the maximum timeout. From this point onwards, retries will increase // only by max timeout. - GPR_ASSERT(next_deadlines.current_deadline == 1535); - exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = backoff.Step(&exec_ctx); - GPR_ASSERT(next_deadlines.current_deadline == 2048); - exec_ctx.now = next_deadlines.current_deadline; - next_deadlines = backoff.Step(&exec_ctx); - GPR_ASSERT(next_deadlines.current_deadline == 2561); + EXPECT_EQ(next, 1535); + exec_ctx.now = next; + next = backoff.Step(&exec_ctx); + EXPECT_EQ(next, 2048); + exec_ctx.now = next; + next = backoff.Step(&exec_ctx); + EXPECT_EQ(next, 2561); grpc_exec_ctx_finish(&exec_ctx); } @@ -156,23 +143,19 @@ TEST(BackOffTest, JitterBackOff) { const grpc_millis min_connect_timeout = 100; const double multiplier = 1.0; const double jitter = 0.1; - Backoff::Options options; + BackOff::Options options; options.set_initial_backoff(initial_backoff) .set_multiplier(multiplier) .set_jitter(jitter) .set_min_connect_timeout(min_connect_timeout) .set_max_backoff(max_backoff); - Backoff backoff(options); + BackOff backoff(options); backoff.SetRandomSeed(0); // force consistent PRNG grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - Backoff::Result next_deadlines = backoff.Begin(&exec_ctx); - GPR_ASSERT(next_deadlines.current_deadline - grpc_exec_ctx_now(&exec_ctx) == - initial_backoff); - GPR_ASSERT(next_deadlines.next_attempt_start_time - - grpc_exec_ctx_now(&exec_ctx) == - initial_backoff); + grpc_millis next = backoff.Begin(&exec_ctx); + EXPECT_EQ(next - grpc_exec_ctx_now(&exec_ctx), initial_backoff); grpc_millis expected_next_lower_bound = (grpc_millis)((double)current_backoff * (1 - jitter)); @@ -180,20 +163,19 @@ TEST(BackOffTest, JitterBackOff) { (grpc_millis)((double)current_backoff * (1 + jitter)); for (int i = 0; i < 10000; i++) { - next_deadlines = backoff.Step(&exec_ctx); + next = backoff.Step(&exec_ctx); // next-now must be within (jitter*100)% of the current backoff (which // increases by * multiplier up to max_backoff). - const grpc_millis timeout_millis = - next_deadlines.current_deadline - grpc_exec_ctx_now(&exec_ctx); - GPR_ASSERT(timeout_millis >= expected_next_lower_bound); - GPR_ASSERT(timeout_millis <= expected_next_upper_bound); - current_backoff = GPR_MIN( + const grpc_millis timeout_millis = next - grpc_exec_ctx_now(&exec_ctx); + EXPECT_GE(timeout_millis, expected_next_lower_bound); + EXPECT_LE(timeout_millis, expected_next_upper_bound); + current_backoff = std::min( (grpc_millis)((double)current_backoff * multiplier), max_backoff); expected_next_lower_bound = (grpc_millis)((double)current_backoff * (1 - jitter)); expected_next_upper_bound = (grpc_millis)((double)current_backoff * (1 + jitter)); - exec_ctx.now = next_deadlines.current_deadline; + exec_ctx.now = next; } grpc_exec_ctx_finish(&exec_ctx); } -- cgit v1.2.3 From 8df0cc3363475b2f8d0775a95809fa7149e26af1 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 5 Dec 2017 14:13:09 -0800 Subject: PR Comments --- .../ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 2 -- .../client_channel/resolver/dns/c_ares/dns_resolver_ares.cc | 2 -- .../client_channel/resolver/dns/native/dns_resolver.cc | 2 -- src/core/ext/filters/client_channel/subchannel.cc | 8 -------- src/core/lib/backoff/backoff.h | 13 +++---------- test/core/backoff/backoff_test.cc | 10 ---------- 6 files changed, 3 insertions(+), 34 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 28bb9be00e..7e5e1532e2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -120,7 +120,6 @@ #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/static_metadata.h" -#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 @@ -1465,7 +1464,6 @@ static void lb_call_init_locked(grpc_exec_ctx* exec_ctx, .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) - .set_min_connect_timeout(GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000) .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); glb_policy->lb_call_backoff.Init(backoff_options); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 7c9cf64bb5..543f5cf343 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -44,7 +44,6 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/transport/service_config.h" -#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1 #define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120 @@ -386,7 +385,6 @@ static grpc_resolver* dns_ares_create(grpc_exec_ctx* exec_ctx, .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) .set_jitter(GRPC_DNS_RECONNECT_JITTER) - .set_min_connect_timeout(GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000) .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); r->backoff.Init(grpc_core::BackOff(backoff_options)); GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked, diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 900330ef28..03a7b446f6 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -36,7 +36,6 @@ #include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/support/string.h" -#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1 #define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120 @@ -262,7 +261,6 @@ static grpc_resolver* dns_create(grpc_exec_ctx* exec_ctx, .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) .set_jitter(GRPC_DNS_RECONNECT_JITTER) - .set_min_connect_timeout(GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000) .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); r->backoff.Init(grpc_core::BackOff(backoff_options)); return &r->base; diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index dbf66ae71a..136a4de2ee 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -290,8 +290,6 @@ static grpc_core::BackOff::Options extract_backoff_options( const grpc_channel_args* args) { int initial_backoff_ms = GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; - int min_connect_timeout_ms = - GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000; int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; bool fixed_reconnect_backoff = false; if (args != nullptr) { @@ -299,14 +297,9 @@ static grpc_core::BackOff::Options extract_backoff_options( if (0 == strcmp(args->args[i].key, "grpc.testing.fixed_reconnect_backoff_ms")) { fixed_reconnect_backoff = true; - initial_backoff_ms = min_connect_timeout_ms = max_backoff_ms = - grpc_channel_arg_get_integer(&args->args[i], - {initial_backoff_ms, 100, INT_MAX}); } else if (0 == strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; - min_connect_timeout_ms = grpc_channel_arg_get_integer( - &args->args[i], {min_connect_timeout_ms, 100, INT_MAX}); } else if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; @@ -327,7 +320,6 @@ static grpc_core::BackOff::Options extract_backoff_options( : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) .set_jitter(fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER) - .set_min_connect_timeout(min_connect_timeout_ms) .set_max_backoff(max_backoff_ms); return backoff_options; } diff --git a/src/core/lib/backoff/backoff.h b/src/core/lib/backoff/backoff.h index 10bb9edab2..6f2c30d42f 100644 --- a/src/core/lib/backoff/backoff.h +++ b/src/core/lib/backoff/backoff.h @@ -33,10 +33,10 @@ class BackOff { explicit BackOff(const Options& options); /// Begin retry loop: returns the deadline to be used for the next attempt, - /// following the backoff / strategy. + /// following the backoff strategy. grpc_millis Begin(grpc_exec_ctx* exec_ctx); - /// Step a retry loop: returns returns the deadline to be used for the next - /// attempt, / following the backoff / strategy. + /// Step a retry loop: returns the deadline to be used for the next attempt, + /// following the backoff strategy. grpc_millis Step(grpc_exec_ctx* exec_ctx); /// Reset the backoff, so the next grpc_backoff_step will be a /// grpc_backoff_begin. @@ -58,10 +58,6 @@ class BackOff { jitter_ = jitter; return *this; } - Options& set_min_connect_timeout(grpc_millis min_connect_timeout) { - min_connect_timeout_ = min_connect_timeout; - return *this; - } Options& set_max_backoff(grpc_millis max_backoff) { max_backoff_ = max_backoff; return *this; @@ -72,8 +68,6 @@ class BackOff { double multiplier() const { return multiplier_; } /// amount to randomize backoffs double jitter() const { return jitter_; } - /// minimum time between retries - grpc_millis min_connect_timeout() const { return min_connect_timeout_; } /// maximum time between retries grpc_millis max_backoff() const { return max_backoff_; } @@ -81,7 +75,6 @@ class BackOff { grpc_millis initial_backoff_; double multiplier_; double jitter_; - grpc_millis min_connect_timeout_; grpc_millis max_backoff_; }; // class Options diff --git a/test/core/backoff/backoff_test.cc b/test/core/backoff/backoff_test.cc index e0b72cfce0..07e61bc11e 100644 --- a/test/core/backoff/backoff_test.cc +++ b/test/core/backoff/backoff_test.cc @@ -36,13 +36,11 @@ TEST(BackOffTest, ConstantBackOff) { const grpc_millis initial_backoff = 200; const double multiplier = 1.0; const double jitter = 0.0; - const grpc_millis min_connect_timeout = 100; const grpc_millis max_backoff = 1000; BackOff::Options options; options.set_initial_backoff(initial_backoff) .set_multiplier(multiplier) .set_jitter(jitter) - .set_min_connect_timeout(min_connect_timeout) .set_max_backoff(max_backoff); BackOff backoff(options); @@ -54,7 +52,6 @@ TEST(BackOffTest, ConstantBackOff) { next_attempt_start_time = backoff.Step(&exec_ctx); EXPECT_EQ(next_attempt_start_time - grpc_exec_ctx_now(&exec_ctx), initial_backoff); - exec_ctx.now = next_attempt_start_time; } grpc_exec_ctx_finish(&exec_ctx); } @@ -63,13 +60,11 @@ TEST(BackOffTest, MinConnect) { const grpc_millis initial_backoff = 100; const double multiplier = 1.0; const double jitter = 0.0; - const grpc_millis min_connect_timeout = 200; const grpc_millis max_backoff = 1000; BackOff::Options options; options.set_initial_backoff(initial_backoff) .set_multiplier(multiplier) .set_jitter(jitter) - .set_min_connect_timeout(min_connect_timeout) .set_max_backoff(max_backoff); BackOff backoff(options); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -82,13 +77,11 @@ TEST(BackOffTest, NoJitterBackOff) { const grpc_millis initial_backoff = 2; const double multiplier = 2.0; const double jitter = 0.0; - const grpc_millis min_connect_timeout = 1; const grpc_millis max_backoff = 513; BackOff::Options options; options.set_initial_backoff(initial_backoff) .set_multiplier(multiplier) .set_jitter(jitter) - .set_min_connect_timeout(min_connect_timeout) .set_max_backoff(max_backoff); BackOff backoff(options); // x_1 = 2 @@ -140,14 +133,12 @@ TEST(BackOffTest, JitterBackOff) { const grpc_millis initial_backoff = 500; grpc_millis current_backoff = initial_backoff; const grpc_millis max_backoff = 1000; - const grpc_millis min_connect_timeout = 100; const double multiplier = 1.0; const double jitter = 0.1; BackOff::Options options; options.set_initial_backoff(initial_backoff) .set_multiplier(multiplier) .set_jitter(jitter) - .set_min_connect_timeout(min_connect_timeout) .set_max_backoff(max_backoff); BackOff backoff(options); @@ -175,7 +166,6 @@ TEST(BackOffTest, JitterBackOff) { (grpc_millis)((double)current_backoff * (1 - jitter)); expected_next_upper_bound = (grpc_millis)((double)current_backoff * (1 + jitter)); - exec_ctx.now = next; } grpc_exec_ctx_finish(&exec_ctx); } -- cgit v1.2.3 From bdea93374d2c72fc412bc1688a75c9c0ca1f1af0 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 5 Dec 2017 16:24:01 -0800 Subject: Undo change to spec and corresponding code --- doc/connection-backoff.md | 5 +++-- src/core/lib/backoff/backoff.cc | 3 +-- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'src/core') diff --git a/doc/connection-backoff.md b/doc/connection-backoff.md index 85cd5e258e..0e83d9b97f 100644 --- a/doc/connection-backoff.md +++ b/doc/connection-backoff.md @@ -26,8 +26,9 @@ ConnectWithBackoff() != SUCCESS) SleepUntil(current_deadline) current_backoff = Min(current_backoff * MULTIPLIER, MAX_BACKOFF) - jitter = UniformRandom(-JITTER * current_backoff, JITTER * current_backoff) - current_deadline = now() + Min(current_backoff + jitter, MAX_BACKOFF) + current_deadline = now() + current_backoff + + UniformRandom(-JITTER * current_backoff, JITTER * current_backoff) + ``` With specific parameters of diff --git a/src/core/lib/backoff/backoff.cc b/src/core/lib/backoff/backoff.cc index 5553e6b3f3..500b3d6a8a 100644 --- a/src/core/lib/backoff/backoff.cc +++ b/src/core/lib/backoff/backoff.cc @@ -59,8 +59,7 @@ grpc_millis BackOff::Step(grpc_exec_ctx* exec_ctx) { const double jitter = generate_uniform_random_number_between( &rng_state_, -options_.jitter() * current_backoff_, options_.jitter() * current_backoff_); - const grpc_millis next_timeout = std::min( - (grpc_millis)(current_backoff_ + jitter), options_.max_backoff()); + const grpc_millis next_timeout = (grpc_millis)(current_backoff_ + jitter); return next_timeout + grpc_exec_ctx_now(exec_ctx); } -- cgit v1.2.3 From abe92cc9883361d2beee420d0ffc691e86ffa86a Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 5 Dec 2017 16:27:36 -0800 Subject: Fix wrong changes to subchannel --- src/core/ext/filters/client_channel/subchannel.cc | 37 +++++++++++++++-------- 1 file changed, 25 insertions(+), 12 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 136a4de2ee..dfe823792c 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -123,6 +123,7 @@ struct grpc_subchannel { /** backoff state */ grpc_core::ManualConstructor backoff; grpc_millis next_attempt_deadline; + grpc_millis min_connect_timeout_ms; /** do we have an active alarm? */ bool have_alarm; @@ -286,42 +287,52 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx* exec_ctx, } } -static grpc_core::BackOff::Options extract_backoff_options( - const grpc_channel_args* args) { - int initial_backoff_ms = +static void parse_args_for_backoff_values( + const grpc_channel_args* args, grpc_core::BackOff::Options* backoff_options, + grpc_millis* min_connect_timeout_ms) { + grpc_millis initial_backoff_ms = GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; - int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + *min_connect_timeout_ms = + GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000; + grpc_millis max_backoff_ms = + GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; bool fixed_reconnect_backoff = false; if (args != nullptr) { for (size_t i = 0; i < args->num_args; i++) { if (0 == strcmp(args->args[i].key, "grpc.testing.fixed_reconnect_backoff_ms")) { fixed_reconnect_backoff = true; + initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms = + grpc_channel_arg_get_integer( + &args->args[i], + {static_cast(initial_backoff_ms), 100, INT_MAX}); } else if (0 == strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; + *min_connect_timeout_ms = grpc_channel_arg_get_integer( + &args->args[i], + {static_cast(*min_connect_timeout_ms), 100, INT_MAX}); } else if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; max_backoff_ms = grpc_channel_arg_get_integer( - &args->args[i], {max_backoff_ms, 100, INT_MAX}); + &args->args[i], {static_cast(max_backoff_ms), 100, INT_MAX}); } else if (0 == strcmp(args->args[i].key, GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; initial_backoff_ms = grpc_channel_arg_get_integer( - &args->args[i], {initial_backoff_ms, 100, INT_MAX}); + &args->args[i], + {static_cast(initial_backoff_ms), 100, INT_MAX}); } } } - grpc_core::BackOff::Options backoff_options; - backoff_options.set_initial_backoff(initial_backoff_ms) + backoff_options->set_initial_backoff(initial_backoff_ms) .set_multiplier(fixed_reconnect_backoff ? 1.0 : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) .set_jitter(fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER) .set_max_backoff(max_backoff_ms); - return backoff_options; } grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, @@ -375,7 +386,10 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, grpc_schedule_on_exec_ctx); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); - c->backoff.Init(extract_backoff_options(args->args)); + grpc_core::BackOff::Options backoff_options; + parse_args_for_backoff_values(args->args, &backoff_options, + &c->min_connect_timeout_ms); + c->backoff.Init(backoff_options); gpr_mu_init(&c->mu); return grpc_subchannel_index_register(exec_ctx, key, c); @@ -435,8 +449,7 @@ static void on_alarm(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); const grpc_millis min_deadline = - (GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000) + - grpc_exec_ctx_now(exec_ctx); + (c->min_connect_timeout_ms * 1000) + grpc_exec_ctx_now(exec_ctx); c->next_attempt_deadline = std::max(c->backoff->Step(exec_ctx), min_deadline); continue_connect_locked(exec_ctx, c); -- cgit v1.2.3 From fa4126f0cb0eb266225704deb75b38a5b935d527 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 6 Dec 2017 10:13:05 -0800 Subject: moved max(..., min_deadline) to the right place in subchannel.cc --- src/core/ext/filters/client_channel/subchannel.cc | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index dfe823792c..77e20f025a 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -400,7 +400,10 @@ static void continue_connect_locked(grpc_exec_ctx* exec_ctx, grpc_connect_in_args args; args.interested_parties = c->pollset_set; - args.deadline = c->next_attempt_deadline; + const grpc_millis min_deadline = + (c->min_connect_timeout_ms * 1000) + grpc_exec_ctx_now(exec_ctx); + c->next_attempt_deadline = c->backoff->Step(exec_ctx); + args.deadline = std::max(c->next_attempt_deadline, min_deadline); args.channel_args = c->args; grpc_connectivity_state_set(exec_ctx, &c->state_tracker, @@ -448,10 +451,6 @@ static void on_alarm(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { } if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - const grpc_millis min_deadline = - (c->min_connect_timeout_ms * 1000) + grpc_exec_ctx_now(exec_ctx); - c->next_attempt_deadline = - std::max(c->backoff->Step(exec_ctx), min_deadline); continue_connect_locked(exec_ctx, c); gpr_mu_unlock(&c->mu); } else { -- cgit v1.2.3 From a5e2da4096c88a5302d8c460c705c9453a389975 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 7 Dec 2017 10:25:25 -0800 Subject: Do the Step() in the right place --- src/core/ext/filters/client_channel/subchannel.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 77e20f025a..0db0f48faa 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -402,10 +402,8 @@ static void continue_connect_locked(grpc_exec_ctx* exec_ctx, args.interested_parties = c->pollset_set; const grpc_millis min_deadline = (c->min_connect_timeout_ms * 1000) + grpc_exec_ctx_now(exec_ctx); - c->next_attempt_deadline = c->backoff->Step(exec_ctx); args.deadline = std::max(c->next_attempt_deadline, min_deadline); args.channel_args = c->args; - grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "state_change"); @@ -451,6 +449,7 @@ static void on_alarm(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { } if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); + c->next_attempt_deadline = c->backoff->Step(exec_ctx); continue_connect_locked(exec_ctx, c); gpr_mu_unlock(&c->mu); } else { -- cgit v1.2.3 From 62d86e9987121c8bd79d4594fb0db019c4faafad Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 13 Dec 2017 13:53:57 -0800 Subject: Fixed wrong conversion factor --- src/core/ext/filters/client_channel/subchannel.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 0db0f48faa..ce061c9782 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -398,10 +398,9 @@ grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, static void continue_connect_locked(grpc_exec_ctx* exec_ctx, grpc_subchannel* c) { grpc_connect_in_args args; - args.interested_parties = c->pollset_set; const grpc_millis min_deadline = - (c->min_connect_timeout_ms * 1000) + grpc_exec_ctx_now(exec_ctx); + c->min_connect_timeout_ms + grpc_exec_ctx_now(exec_ctx); args.deadline = std::max(c->next_attempt_deadline, min_deadline); args.channel_args = c->args; grpc_connectivity_state_set(exec_ctx, &c->state_tracker, -- cgit v1.2.3 From 837a47918bfd60f7a1a8be248db80913e080e76a Mon Sep 17 00:00:00 2001 From: ita9naiwa Date: Tue, 19 Dec 2017 03:38:12 +0900 Subject: minor typo : becuase -> because --- src/core/lib/iomgr/ev_epoll1_linux.cc | 2 +- src/core/lib/iomgr/ev_epollex_linux.cc | 2 +- src/core/lib/iomgr/ev_epollsig_linux.cc | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index ae9d47ece5..92b4b8b90b 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -1233,7 +1233,7 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { * NULL */ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { gpr_log(GPR_ERROR, - "Skipping epoll1 becuase GRPC_LINUX_EPOLL is not defined."); + "Skipping epoll1 because GRPC_LINUX_EPOLL is not defined."); return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index b2817156a8..df2f629703 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -1450,7 +1450,7 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux( const grpc_event_engine_vtable* grpc_init_epollex_linux( bool explicitly_requested) { gpr_log(GPR_ERROR, - "Skipping epollex becuase GRPC_LINUX_EPOLL is not defined."); + "Skipping epollex because GRPC_LINUX_EPOLL is not defined."); return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index 7a8962f4a8..bc548a1fda 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -1733,7 +1733,7 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux( const grpc_event_engine_vtable* grpc_init_epollsig_linux( bool explicit_request) { gpr_log(GPR_ERROR, - "Skipping epollsig becuase GRPC_LINUX_EPOLL is not defined."); + "Skipping epollsig because GRPC_LINUX_EPOLL is not defined."); return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ -- cgit v1.2.3 From ffe6e03ecd8be29448b7c6c7837c11bfd5d3aa8f Mon Sep 17 00:00:00 2001 From: Noah Eisen Date: Tue, 19 Dec 2017 07:39:03 -0800 Subject: Fix internal TSAN bug --- src/core/lib/iomgr/error.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 42cd7c455d..67c3caf5ee 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -749,7 +749,7 @@ const char* grpc_error_string(grpc_error* err) { if (!gpr_atm_rel_cas(&err->atomics.error_string, 0, (gpr_atm)out)) { gpr_free(out); - out = (char*)gpr_atm_no_barrier_load(&err->atomics.error_string); + out = (char*)gpr_atm_acq_load(&err->atomics.error_string); } GPR_TIMER_END("grpc_error_string", 0); -- cgit v1.2.3 From 5f31f01a8c60858d3607df43c77977408f1dc180 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 19 Dec 2017 08:49:15 -0800 Subject: Fix use-after-free caused by unsync'd access in tcp_client_posix. tc_on_alarm() and on_writable() race, resulting in the following: ``` D1219 08:59:33.425951347 86323 tcp_client_posix.cc:143] CLIENT_CONNECT: ipv4:127.0.0.1:27465: on_writable: error="No Error" D1219 08:59:33.426032150 86342 tcp_client_posix.cc:104] CLIENT_CONNECT: ipv4:127.0.0.1:27465: on_alarm: error="No Error" // At this point, note that the callbacks are running on different threads. D1219 08:59:33.426063521 86323 tcp_client_posix.cc:218] XXX on_writable ac->addr_str 0x603000008dd0 before unlock. # refs 2->1. Done 0 // on_writable() unrefs while still holding the lock. Because refs > 0, it marks its "done" as false and unlocks. D1219 08:59:33.426125130 86342 tcp_client_posix.cc:113] XXX tc_on_alarm ac->addr_str 0x603000008dd0 before unlock. # refs 1->0. Done 1 // right after on_writable() unlocks, tc_on_alarm() acquires the lock and unrefs, this time getting to zero and marking its "done" as true. // It then proceeds to destroy "ac", and, in particular for this failure, "ac->addr_str". D1219 08:59:33.426139370 86323 tcp_client_posix.cc:234] XXX on_writable about to read from ac->addr_str 0x603000008dd0. Done 0, error=OS Error // When on_writable() tries to read ac->addr_str to assemble its error details, it causes a use-after-free. ``` The problem is the lock isn't held long enough by on_writable(). Alternatively, a copy of ac->addr_str could be made in on_writable() while still holding the lock, but that seems more fragile. It doesn't seem that holding the lock longer would be a performance issue, given we are in a failure scenario. --- src/core/lib/iomgr/tcp_client_posix.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 24ccab14b2..799dda5c3e 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -212,7 +212,6 @@ finish: fd = nullptr; } done = (--ac->refs == 0); - gpr_mu_unlock(&ac->mu); if (error != GRPC_ERROR_NONE) { char* error_descr; grpc_slice str; @@ -227,6 +226,7 @@ finish: error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(ac->addr_str)); } + gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_str); -- cgit v1.2.3 From 4ef4c38275ce558ee608f95fde36195a62eb2389 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 19 Dec 2017 09:34:18 -0800 Subject: Create copy of "ac" data instead of stretching the locked section --- src/core/lib/iomgr/tcp_client_posix.cc | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 799dda5c3e..40ba1623a2 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -212,6 +212,10 @@ finish: fd = nullptr; } done = (--ac->refs == 0); + // Create a copy of the data from "ac" to be accessed after the unlock, as + // "ac" and its contents may be deallocated by the time they are read. + const grpc_slice addr_str_slice = grpc_slice_from_copied_string(ac->addr_str); + gpr_mu_unlock(&ac->mu); if (error != GRPC_ERROR_NONE) { char* error_descr; grpc_slice str; @@ -224,10 +228,12 @@ finish: gpr_free(error_descr); gpr_free(desc); error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, - grpc_slice_from_copied_string(ac->addr_str)); + addr_str_slice); + grpc_slice_unref(addr_str_slice); } - gpr_mu_unlock(&ac->mu); if (done) { + // This is safe even outside the lock, because "done", the sentinel, is + // populated *inside* the lock. gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_str); grpc_channel_args_destroy(ac->channel_args); -- cgit v1.2.3 From 2df509fc0e8628d6d4431139ce953c70796a21eb Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Wed, 6 Dec 2017 14:32:34 -0800 Subject: Fix a Python spinlock bug --- src/core/lib/iomgr/ev_poll_posix.cc | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 8659559f78..13468b30c0 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -71,6 +71,7 @@ struct grpc_fd { int shutdown; int closed; int released; + gpr_atm pollhup; grpc_error* shutdown_error; /* The watcher list. @@ -339,6 +340,7 @@ static grpc_fd* fd_create(int fd, const char* name) { r->on_done_closure = nullptr; r->closed = 0; r->released = 0; + gpr_atm_no_barrier_store(&r->pollhup, 0); r->read_notifier_pollset = nullptr; char* name2; @@ -964,7 +966,8 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, pfds[0].events = POLLIN; pfds[0].revents = 0; for (i = 0; i < pollset->fd_count; i++) { - if (fd_is_orphaned(pollset->fds[i])) { + if (fd_is_orphaned(pollset->fds[i]) || + gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } else { pollset->fds[fd_count++] = pollset->fds[i]; @@ -1031,6 +1034,12 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0, (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents); } + /* This is a mitigation to prevent poll() from spinning on a + ** POLLHUP https://github.com/grpc/grpc/pull/13665 + */ + if (pfds[i].revents & POLLHUP) { + gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1); + } fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, pfds[i].revents & POLLOUT_CHECK, pollset); } -- cgit v1.2.3 From c41bbd3c33df25a25014e14fbeee4ced8f1433fa Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 19 Dec 2017 14:59:41 -0800 Subject: Fix wrong unreffing of slice --- src/core/lib/iomgr/tcp_client_posix.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 40ba1623a2..8cd5f8d618 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -228,7 +228,8 @@ finish: gpr_free(error_descr); gpr_free(desc); error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, - addr_str_slice); + addr_str_slice /* takes ownership */); + } else { grpc_slice_unref(addr_str_slice); } if (done) { -- cgit v1.2.3 From 77346746aa69920dc949e201d56c0f11192eaa31 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Tue, 19 Dec 2017 15:20:15 -0800 Subject: Add ownership semantic comments to error.h --- src/core/lib/iomgr/error.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 4759ee0791..8c72a439f6 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -165,6 +165,8 @@ void grpc_error_unref(grpc_error* err); grpc_error* grpc_error_set_int(grpc_error* src, grpc_error_ints which, intptr_t value) GRPC_MUST_USE_RESULT; bool grpc_error_get_int(grpc_error* error, grpc_error_ints which, intptr_t* p); +/// This call takes ownership of the slice; the error is responsible for +/// eventually unref-ing it. grpc_error* grpc_error_set_str(grpc_error* src, grpc_error_strs which, grpc_slice str) GRPC_MUST_USE_RESULT; /// Returns false if the specified string is not set. @@ -174,7 +176,8 @@ bool grpc_error_get_str(grpc_error* error, grpc_error_strs which, /// Add a child error: an error that is believed to have contributed to this /// error occurring. Allows root causing high level errors from lower level -/// errors that contributed to them. +/// errors that contributed to them. The src error takes ownership of the +/// child error. grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) GRPC_MUST_USE_RESULT; grpc_error* grpc_os_error(const char* file, int line, int err, -- cgit v1.2.3 From 13a3e8cc2a42070ef8a14dc6b22c37cfc45fecaa Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 19 Dec 2017 15:33:06 -0800 Subject: Add namespace to macro expansion --- src/core/lib/support/debug_location.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/support/debug_location.h b/src/core/lib/support/debug_location.h index 0939da595d..9b3f9220fc 100644 --- a/src/core/lib/support/debug_location.h +++ b/src/core/lib/support/debug_location.h @@ -36,7 +36,7 @@ class DebugLocation { const char* file_; const int line_; }; -#define DEBUG_LOCATION DebugLocation(__FILE__, __LINE__) +#define DEBUG_LOCATION ::grpc_core::DebugLocation(__FILE__, __LINE__) #else class DebugLocation { public: @@ -44,7 +44,7 @@ class DebugLocation { const char* file() const { return nullptr; } int line() const { return -1; } }; -#define DEBUG_LOCATION DebugLocation() +#define DEBUG_LOCATION ::grpc_core::DebugLocation() #endif } // namespace grpc_core -- cgit v1.2.3 From 0d18814106f19197e80366ee147b4c1565fadf96 Mon Sep 17 00:00:00 2001 From: Dan Zhang Date: Wed, 20 Dec 2017 15:07:38 -0500 Subject: Add a start_cb to grpc_udp_listener to be called when listener is created. --- src/core/lib/iomgr/udp_server.cc | 20 +++++++++++++------- src/core/lib/iomgr/udp_server.h | 6 +++++- test/core/iomgr/udp_server_test.cc | 25 +++++++++++++++---------- 3 files changed, 33 insertions(+), 18 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 55e0b165ec..4a97f3353d 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -72,6 +72,7 @@ struct grpc_udp_listener { grpc_udp_server_read_cb read_cb; grpc_udp_server_write_cb write_cb; grpc_udp_server_orphan_cb orphan_cb; + grpc_udp_server_start_cb start_cb; // To be scheduled on another thread to actually read/write. grpc_closure do_read_closure; grpc_closure do_write_closure; @@ -353,7 +354,7 @@ static void do_read(void* arg, grpc_error* error) { * read lock if available. */ gpr_mu_lock(&sp->server->mu); /* Tell the registered callback that data is available to read. */ - if (!sp->already_shutdown && sp->read_cb(sp->emfd, sp->server->user_data)) { + if (!sp->already_shutdown && sp->read_cb(sp->emfd)) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); @@ -383,7 +384,7 @@ static void on_read(void* arg, grpc_error* error) { /* Read once. If there is more data to read, off load the work to another * thread to finish. */ GPR_ASSERT(sp->read_cb); - if (sp->read_cb(sp->emfd, sp->server->user_data)) { + if (sp->read_cb(sp->emfd)) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg, @@ -411,7 +412,7 @@ void fd_notify_on_write_wrapper(void* arg, grpc_error* error) { static void do_write(void* arg, grpc_error* error) { grpc_udp_listener* sp = reinterpret_cast(arg); - gpr_mu_lock(&(sp->server->mu)); + gpr_mu_lock(&sp->server->mu); if (sp->already_shutdown) { // If fd has been shutdown, don't write any more and re-arm notification. grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); @@ -429,7 +430,7 @@ static void do_write(void* arg, grpc_error* error) { static void on_write(void* arg, grpc_error* error) { grpc_udp_listener* sp = (grpc_udp_listener*)arg; - gpr_mu_lock(&(sp->server->mu)); + gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); @@ -450,6 +451,7 @@ static void on_write(void* arg, grpc_error* error) { static int add_socket_to_server(grpc_udp_server* s, int fd, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { @@ -480,6 +482,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, sp->read_cb = read_cb; sp->write_cb = write_cb; sp->orphan_cb = orphan_cb; + sp->start_cb = start_cb; sp->orphan_notified = false; sp->already_shutdown = false; GPR_ASSERT(sp->emfd); @@ -492,6 +495,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { @@ -541,8 +545,8 @@ int grpc_udp_server_add_port(grpc_udp_server* s, // TODO(rjshade): Test and propagate the returned grpc_error*: GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); - allocated_port1 = - add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); + allocated_port1 = add_socket_to_server(s, fd, addr, start_cb, read_cb, + write_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -565,7 +569,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, addr = &addr4_copy; } allocated_port2 = - add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); + add_socket_to_server(s, fd, addr, start_cb, read_cb, write_cb, orphan_cb); done: gpr_free(allocated_addr); @@ -587,6 +591,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) { void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, size_t pollset_count, void* user_data) { + gpr_log(GPR_DEBUG, "grpc_udp_server_start"); size_t i; gpr_mu_lock(&s->mu); grpc_udp_listener* sp; @@ -596,6 +601,7 @@ void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, sp = s->head; while (sp != nullptr) { + sp->start_cb(sp->emfd, sp->server->user_data); for (i = 0; i < pollset_count; i++) { grpc_pollset_add_fd(pollsets[i], sp->emfd); } diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 02e3acb7f5..a469ab9be5 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -30,9 +30,12 @@ struct grpc_server; /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; +/* Called when grpc server starts to listening on the grpc_fd. */ +typedef void (*grpc_udp_server_start_cb)(grpc_fd* emfd, void* user_data); + /* Called when data is available to read from the socket. * Return true if there is more data to read from fd. */ -typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd, void* user_data); +typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd); /* Called when the socket is writeable. The given closure should be scheduled * when the socket becomes blocked next time. */ @@ -65,6 +68,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index); all of the multiple socket port matching logic in one place */ int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb); diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc index 0deb534abd..f4d2243e05 100644 --- a/test/core/iomgr/udp_server_test.cc +++ b/test/core/iomgr/udp_server_test.cc @@ -49,8 +49,13 @@ static int g_number_of_reads = 0; static int g_number_of_writes = 0; static int g_number_of_bytes_read = 0; static int g_number_of_orphan_calls = 0; +static int g_number_of_starts = 0; -static bool on_read(grpc_fd* emfd, void* user_data) { +static void on_start(grpc_fd* emfd, void* user_data) { + g_number_of_starts++; +} + +static bool on_read(grpc_fd* emfd) { char read_buffer[512]; ssize_t byte_count; @@ -154,8 +159,8 @@ static void test_no_op_with_port(void) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write, - on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, + on_write, on_fd_orphaned)); grpc_udp_server_destroy(s, nullptr); @@ -182,8 +187,8 @@ static void test_no_op_with_port_and_socket_factory(void) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write, - on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, + on_write, on_fd_orphaned)); GPR_ASSERT(socket_factory->number_of_socket_calls == 1); GPR_ASSERT(socket_factory->number_of_bind_calls == 1); @@ -206,11 +211,11 @@ static void test_no_op_with_port_and_start(void) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write, - on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, + on_write, on_fd_orphaned)); grpc_udp_server_start(s, nullptr, 0, nullptr); - + GPR_ASSERT(g_number_of_starts == 1); grpc_udp_server_destroy(s, nullptr); /* The server had a single FD, which is orphaned exactly once in * @@ -236,8 +241,8 @@ static void test_receive(int number_of_clients) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_storage); addr->ss_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_read, on_write, - on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, + on_write, on_fd_orphaned)); svrfd = grpc_udp_server_get_fd(s, 0); GPR_ASSERT(svrfd >= 0); -- cgit v1.2.3 From 1e522efa411fb27b841c3ac888ca2006ea77a5dd Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 21 Dec 2017 16:16:16 -0800 Subject: Separate client and server keepalive_permit_without_calls --- .../transport/chttp2/transport/chttp2_transport.cc | 32 ++++++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 4d81be10e9..7c77de2168 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -79,7 +79,9 @@ static int g_default_server_keepalive_time_ms = DEFAULT_SERVER_KEEPALIVE_TIME_MS; static int g_default_server_keepalive_timeout_ms = DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS; -static bool g_default_keepalive_permit_without_calls = +static bool g_default_client_keepalive_permit_without_calls = + DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; +static bool g_default_server_keepalive_permit_without_calls = DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; static int g_default_min_sent_ping_interval_without_data_ms = @@ -343,6 +345,8 @@ static void init_transport(grpc_chttp2_transport* t, t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX ? GRPC_MILLIS_INF_FUTURE : g_default_client_keepalive_timeout_ms; + t->keepalive_permit_without_calls = + g_default_client_keepalive_permit_without_calls; } else { t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX ? GRPC_MILLIS_INF_FUTURE @@ -350,8 +354,9 @@ static void init_transport(grpc_chttp2_transport* t, t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX ? GRPC_MILLIS_INF_FUTURE : g_default_server_keepalive_timeout_ms; + t->keepalive_permit_without_calls = + g_default_server_keepalive_permit_without_calls; } - t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls; t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; @@ -2521,7 +2526,9 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, for (i = 0; i < args->num_args; i++) { if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) { const int value = grpc_channel_arg_get_integer( - &args->args[i], {g_default_client_keepalive_time_ms, 1, INT_MAX}); + &args->args[i], {is_client ? g_default_client_keepalive_time_ms + : g_default_server_keepalive_time_ms, + 1, INT_MAX}); if (is_client) { g_default_client_keepalive_time_ms = value; } else { @@ -2530,8 +2537,9 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, } else if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { const int value = grpc_channel_arg_get_integer( - &args->args[i], - {g_default_client_keepalive_timeout_ms, 0, INT_MAX}); + &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms + : g_default_server_keepalive_timeout_ms, + 0, INT_MAX}); if (is_client) { g_default_client_keepalive_timeout_ms = value; } else { @@ -2539,10 +2547,16 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, } } else if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { - g_default_keepalive_permit_without_calls = - (uint32_t)grpc_channel_arg_get_integer( - &args->args[i], - {g_default_keepalive_permit_without_calls, 0, 1}); + const bool value = (uint32_t)grpc_channel_arg_get_integer( + &args->args[i], + {is_client ? g_default_client_keepalive_permit_without_calls + : g_default_server_keepalive_timeout_ms, + 0, 1}); + if (is_client) { + g_default_client_keepalive_permit_without_calls = value; + } else { + g_default_server_keepalive_permit_without_calls = value; + } } else if (0 == strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) { g_default_max_ping_strikes = grpc_channel_arg_get_integer( -- cgit v1.2.3 From 98efefcc1e4e94d97125e828b1ec99174c2ee1d4 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Thu, 28 Dec 2017 09:13:07 -0800 Subject: Relax log-level when polling engines are undefined --- src/core/lib/iomgr/ev_epoll1_linux.cc | 2 -- src/core/lib/iomgr/ev_epollex_linux.cc | 2 -- src/core/lib/iomgr/ev_epollsig_linux.cc | 2 -- 3 files changed, 6 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 0dda1d924c..2467b9a585 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -1258,8 +1258,6 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return * NULL */ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { - gpr_log(GPR_ERROR, - "Skipping epoll1 becuase GRPC_LINUX_EPOLL is not defined."); return NULL; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 62643df697..931f3fc355 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -1479,8 +1479,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux( * NULL */ const grpc_event_engine_vtable* grpc_init_epollex_linux( bool explicitly_requested) { - gpr_log(GPR_ERROR, - "Skipping epollex becuase GRPC_LINUX_EPOLL is not defined."); return NULL; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index 12c8483b8e..7ff043f5f9 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -1758,8 +1758,6 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux( * NULL */ const grpc_event_engine_vtable* grpc_init_epollsig_linux( bool explicit_request) { - gpr_log(GPR_ERROR, - "Skipping epollsig becuase GRPC_LINUX_EPOLL is not defined."); return NULL; } #endif /* defined(GRPC_POSIX_SOCKET) */ -- cgit v1.2.3 From b78df7ab779d53beb25f46099c467855f009924f Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 1 Jan 2018 15:04:38 -0800 Subject: Fix struct names --- src/core/lib/iomgr/ev_poll_posix.cc | 17 +++++++++-------- src/core/lib/iomgr/wakeup_fd_cv.cc | 8 ++++---- src/core/lib/iomgr/wakeup_fd_cv.h | 24 ++++++++++++------------ 3 files changed, 25 insertions(+), 24 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index a569f674f6..7ea1dfaa80 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -243,7 +243,7 @@ struct grpc_pollset_set { typedef struct poll_result { gpr_refcount refcount; - cv_node* watchers; + grpc_cv_node* watchers; int watchcount; struct pollfd* fds; nfds_t nfds; @@ -274,7 +274,7 @@ typedef struct poll_hash_table { } poll_hash_table; poll_hash_table poll_cache; -cv_fd_table g_cvfds; +grpc_cv_fd_table g_cvfds; /******************************************************************************* * fd_posix.c @@ -1444,7 +1444,7 @@ static void decref_poll_result(poll_result* res) { } } -void remove_cvn(cv_node** head, cv_node* target) { +void remove_cvn(grpc_cv_node** head, grpc_cv_node* target) { if (target->next) { target->next->prev = target->prev; } @@ -1469,7 +1469,7 @@ static void run_poll(void* args) { result->completed = 1; result->retval = retval; result->err = errno; - cv_node* watcher = result->watchers; + grpc_cv_node* watcher = result->watchers; while (watcher) { gpr_cv_signal(watcher->cv); watcher = watcher->next; @@ -1503,17 +1503,17 @@ static void run_poll(void* args) { static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) { unsigned int i; int res, idx; - cv_node* pollcv; + grpc_cv_node* pollcv; int skip_poll = 0; nfds_t nsockfds = 0; poll_result* result = nullptr; gpr_mu_lock(&g_cvfds.mu); - pollcv = (cv_node*)gpr_malloc(sizeof(cv_node)); + pollcv = (grpc_cv_node*)gpr_malloc(sizeof(grpc_cv_node)); pollcv->next = nullptr; gpr_cv pollcv_cv; gpr_cv_init(&pollcv_cv); pollcv->cv = &pollcv_cv; - cv_node* fd_cvs = (cv_node*)gpr_malloc(nfds * sizeof(cv_node)); + grpc_cv_node* fd_cvs = (grpc_cv_node*)gpr_malloc(nfds * sizeof(grpc_cv_node)); for (i = 0; i < nfds; i++) { fds[i].revents = 0; @@ -1609,7 +1609,8 @@ static void global_cv_fd_table_init() { gpr_cv_init(&g_cvfds.shutdown_cv); gpr_ref_init(&g_cvfds.pollcount, 1); g_cvfds.size = CV_DEFAULT_TABLE_SIZE; - g_cvfds.cvfds = (fd_node*)gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE); + g_cvfds.cvfds = + (grpc_fd_node*)gpr_malloc(sizeof(grpc_fd_node) * CV_DEFAULT_TABLE_SIZE); g_cvfds.free_fds = nullptr; thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN); for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) { diff --git a/src/core/lib/iomgr/wakeup_fd_cv.cc b/src/core/lib/iomgr/wakeup_fd_cv.cc index 5c1f16d3fc..c785114212 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.cc +++ b/src/core/lib/iomgr/wakeup_fd_cv.cc @@ -34,7 +34,7 @@ #define MAX_TABLE_RESIZE 256 -extern cv_fd_table g_cvfds; +extern grpc_cv_fd_table g_cvfds; static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { unsigned int i, newsize; @@ -42,8 +42,8 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { gpr_mu_lock(&g_cvfds.mu); if (!g_cvfds.free_fds) { newsize = GPR_MIN(g_cvfds.size * 2, g_cvfds.size + MAX_TABLE_RESIZE); - g_cvfds.cvfds = - (fd_node*)gpr_realloc(g_cvfds.cvfds, sizeof(fd_node) * newsize); + g_cvfds.cvfds = (grpc_fd_node*)gpr_realloc(g_cvfds.cvfds, + sizeof(grpc_fd_node) * newsize); for (i = g_cvfds.size; i < newsize; i++) { g_cvfds.cvfds[i].is_set = 0; g_cvfds.cvfds[i].cvs = nullptr; @@ -64,7 +64,7 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { } static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) { - cv_node* cvn; + grpc_cv_node* cvn; gpr_mu_lock(&g_cvfds.mu); g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 1; cvn = g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs; diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h index 017e41bfa8..399620af76 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.h +++ b/src/core/lib/iomgr/wakeup_fd_cv.h @@ -40,27 +40,27 @@ #define GRPC_FD_TO_IDX(fd) (-(fd)-1) #define GRPC_IDX_TO_FD(idx) (-(idx)-1) -typedef struct cv_node { +typedef struct grpc_cv_node { gpr_cv* cv; - struct cv_node* next; - struct cv_node* prev; -} cv_node; + struct grpc_cv_node* next; + struct grpc_cv_node* prev; +} grpc_cv_node; -typedef struct fd_node { +typedef struct grpc_fd_node { int is_set; - cv_node* cvs; - struct fd_node* next_free; -} fd_node; + grpc_cv_node* cvs; + struct grpc_fd_node* next_free; +} grpc_fd_node; -typedef struct cv_fd_table { +typedef struct grpc_cv_fd_table { gpr_mu mu; gpr_refcount pollcount; gpr_cv shutdown_cv; - fd_node* cvfds; - fd_node* free_fds; + grpc_fd_node* cvfds; + grpc_fd_node* free_fds; unsigned int size; grpc_poll_function_type poll; -} cv_fd_table; +} grpc_cv_fd_table; extern const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable; -- cgit v1.2.3