diff options
author | 2018-01-02 11:44:03 -0800 | |
---|---|---|
committer | 2018-01-02 11:44:03 -0800 | |
commit | 24902641d434005bee6641e62fb2b57fc0b6bd87 (patch) | |
tree | d9ab96b6531e30ca28044d571f2941f05cad52cb /src/core | |
parent | d230ad086a894cc963235b27814e19bf686eb7aa (diff) | |
parent | 63392f682e21543099926251b642cdcd0be2a17f (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into flow-control-part4
Diffstat (limited to 'src/core')
19 files changed, 293 insertions, 227 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 3c64213fb9..dd6fc602ab 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -113,13 +113,13 @@ #include "src/core/lib/slice/slice_hash_table.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/static_metadata.h" -#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 @@ -408,7 +408,7 @@ typedef struct glb_lb_policy { grpc_slice lb_call_status_details; /** LB call retry backoff state */ - grpc_backoff lb_call_backoff_state; + grpc_core::ManualConstructor<grpc_core::BackOff> lb_call_backoff; /** LB call retry timer */ grpc_timer lb_call_retry_timer; @@ -1167,7 +1167,7 @@ static void start_picking_locked(glb_lb_policy* glb_policy) { } glb_policy->started_picking = true; - grpc_backoff_reset(&glb_policy->lb_call_backoff_state); + glb_policy->lb_call_backoff->Reset(); query_for_backends_locked(glb_policy); } @@ -1302,8 +1302,7 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { glb_policy->updating_lb_call = false; } else if (!glb_policy->shutting_down) { /* if we aren't shutting down, restart the LB client call after some time */ - grpc_millis next_try = grpc_backoff_step(&glb_policy->lb_call_backoff_state) - .next_attempt_start_time; + grpc_millis next_try = glb_policy->lb_call_backoff->Step(); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", glb_policy); @@ -1463,12 +1462,14 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) { lb_on_response_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_backoff_init(&glb_policy->lb_call_backoff_state, - GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER, - GRPC_GRPCLB_RECONNECT_JITTER, - GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::BackOff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) + .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + + glb_policy->lb_call_backoff.Init(backoff_options); glb_policy->seen_initial_response = false; glb_policy->last_client_load_report_counters_were_zero = false; @@ -1572,7 +1573,7 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) { memset(ops, 0, sizeof(ops)); grpc_op* op = ops; if (glb_policy->lb_response_payload != nullptr) { - grpc_backoff_reset(&glb_policy->lb_call_backoff_state); + glb_policy->lb_call_backoff->Reset(); /* Received data from the LB server. Look inside * glb_policy->lb_response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 4ec4477c82..4659a5f3ed 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -40,10 +40,10 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/json/json.h" #include "src/core/lib/support/env.h" +#include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/service_config.h" -#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1 #define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120 @@ -89,7 +89,7 @@ typedef struct { bool have_retry_timer; grpc_timer retry_timer; /** retry backoff state */ - grpc_backoff backoff_state; + grpc_core::ManualConstructor<grpc_core::BackOff> backoff; /** currently resolving addresses */ grpc_lb_addresses* lb_addresses; @@ -131,7 +131,7 @@ static void dns_ares_shutdown_locked(grpc_resolver* resolver) { static void dns_ares_channel_saw_error_locked(grpc_resolver* resolver) { ares_dns_resolver* r = (ares_dns_resolver*)resolver; if (!r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_ares_start_resolving_locked(r); } } @@ -264,8 +264,7 @@ static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) { } else { const char* msg = grpc_error_string(error); gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); - grpc_millis next_try = - grpc_backoff_step(&r->backoff_state).next_attempt_start_time; + grpc_millis next_try = r->backoff->Step(); grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); @@ -298,7 +297,7 @@ static void dns_ares_next_locked(grpc_resolver* resolver, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_ares_start_resolving_locked(r); } else { dns_ares_maybe_finish_next_locked(r); @@ -368,11 +367,13 @@ static grpc_resolver* dns_ares_create(grpc_resolver_args* args, if (args->pollset_set != nullptr) { grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set); } - grpc_backoff_init( - &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER, - GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::BackOff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_DNS_RECONNECT_JITTER) + .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + r->backoff.Init(grpc_core::BackOff(backoff_options)); GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked, dns_ares_on_retry_timer_locked, r, grpc_combiner_scheduler(r->base.combiner)); diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 77698e97aa..1c2cfc08e7 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -33,9 +33,9 @@ #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/env.h" +#include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/support/string.h" -#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1 #define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120 @@ -70,7 +70,7 @@ typedef struct { grpc_timer retry_timer; grpc_closure on_retry; /** retry backoff state */ - grpc_backoff backoff_state; + grpc_core::ManualConstructor<grpc_core::BackOff> backoff; /** currently resolving addresses */ grpc_resolved_addresses* addresses; @@ -106,7 +106,7 @@ static void dns_shutdown_locked(grpc_resolver* resolver) { static void dns_channel_saw_error_locked(grpc_resolver* resolver) { dns_resolver* r = (dns_resolver*)resolver; if (!r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_start_resolving_locked(r); } } @@ -119,7 +119,7 @@ static void dns_next_locked(grpc_resolver* resolver, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_start_resolving_locked(r); } else { dns_maybe_finish_next_locked(r); @@ -161,8 +161,7 @@ static void dns_on_resolved_locked(void* arg, grpc_error* error) { grpc_resolved_addresses_destroy(r->addresses); grpc_lb_addresses_destroy(addresses); } else { - grpc_millis next_try = - grpc_backoff_step(&r->backoff_state).next_attempt_start_time; + grpc_millis next_try = r->backoff->Step(); grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); @@ -244,11 +243,13 @@ static grpc_resolver* dns_create(grpc_resolver_args* args, if (args->pollset_set != nullptr) { grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set); } - grpc_backoff_init( - &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER, - GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::BackOff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_DNS_RECONNECT_JITTER) + .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + r->backoff.Init(grpc_core::BackOff(backoff_options)); return &r->base; } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 84a5ace31d..f07394d29b 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -20,7 +20,9 @@ #include <inttypes.h> #include <limits.h> -#include <string.h> + +#include <algorithm> +#include <cstring> #include <grpc/support/alloc.h> #include <grpc/support/avl.h> @@ -39,6 +41,7 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/connectivity_state.h" @@ -48,7 +51,7 @@ #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6 -#define GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS 20 +#define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 @@ -118,8 +121,9 @@ struct grpc_subchannel { external_state_watcher root_external_state_watcher; /** backoff state */ - grpc_backoff backoff_state; - grpc_backoff_result backoff_result; + grpc_core::ManualConstructor<grpc_core::BackOff> backoff; + grpc_millis next_attempt_deadline; + grpc_millis min_connect_timeout_ms; /** do we have an active alarm? */ bool have_alarm; @@ -274,6 +278,54 @@ void grpc_subchannel_weak_unref( } } +static void parse_args_for_backoff_values( + const grpc_channel_args* args, grpc_core::BackOff::Options* backoff_options, + grpc_millis* min_connect_timeout_ms) { + grpc_millis initial_backoff_ms = + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; + *min_connect_timeout_ms = + GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000; + grpc_millis max_backoff_ms = + GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + bool fixed_reconnect_backoff = false; + if (args != nullptr) { + for (size_t i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, + "grpc.testing.fixed_reconnect_backoff_ms")) { + fixed_reconnect_backoff = true; + initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms = + grpc_channel_arg_get_integer( + &args->args[i], + {static_cast<int>(initial_backoff_ms), 100, INT_MAX}); + } else if (0 == + strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + *min_connect_timeout_ms = grpc_channel_arg_get_integer( + &args->args[i], + {static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX}); + } else if (0 == + strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + max_backoff_ms = grpc_channel_arg_get_integer( + &args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX}); + } else if (0 == strcmp(args->args[i].key, + GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + initial_backoff_ms = grpc_channel_arg_get_integer( + &args->args[i], + {static_cast<int>(initial_backoff_ms), 100, INT_MAX}); + } + } + } + backoff_options->set_initial_backoff(initial_backoff_ms) + .set_multiplier(fixed_reconnect_backoff + ? 1.0 + : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(fixed_reconnect_backoff ? 0.0 + : GRPC_SUBCHANNEL_RECONNECT_JITTER) + .set_max_backoff(max_backoff_ms); +} + grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, const grpc_subchannel_args* args) { grpc_subchannel_key* key = grpc_subchannel_key_create(args); @@ -324,43 +376,10 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, grpc_schedule_on_exec_ctx); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); - int initial_backoff_ms = - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; - int min_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000; - int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; - bool fixed_reconnect_backoff = false; - if (c->args) { - for (size_t i = 0; i < c->args->num_args; i++) { - if (0 == strcmp(c->args->args[i].key, - "grpc.testing.fixed_reconnect_backoff_ms")) { - fixed_reconnect_backoff = true; - initial_backoff_ms = min_backoff_ms = max_backoff_ms = - grpc_channel_arg_get_integer(&c->args->args[i], - {initial_backoff_ms, 100, INT_MAX}); - } else if (0 == strcmp(c->args->args[i].key, - GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { - fixed_reconnect_backoff = false; - min_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], {min_backoff_ms, 100, INT_MAX}); - } else if (0 == strcmp(c->args->args[i].key, - GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { - fixed_reconnect_backoff = false; - max_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], {max_backoff_ms, 100, INT_MAX}); - } else if (0 == strcmp(c->args->args[i].key, - GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { - fixed_reconnect_backoff = false; - initial_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], {initial_backoff_ms, 100, INT_MAX}); - } - } - } - grpc_backoff_init( - &c->backoff_state, initial_backoff_ms, - fixed_reconnect_backoff ? 1.0 - : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, - fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER, - min_backoff_ms, max_backoff_ms); + grpc_core::BackOff::Options backoff_options; + parse_args_for_backoff_values(args->args, &backoff_options, + &c->min_connect_timeout_ms); + c->backoff.Init(backoff_options); gpr_mu_init(&c->mu); return grpc_subchannel_index_register(key, c); @@ -368,11 +387,11 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, static void continue_connect_locked(grpc_subchannel* c) { grpc_connect_in_args args; - args.interested_parties = c->pollset_set; - args.deadline = c->backoff_result.current_deadline; + const grpc_millis min_deadline = + c->min_connect_timeout_ms + grpc_core::ExecCtx::Get()->Now(); + args.deadline = std::max(c->next_attempt_deadline, min_deadline); args.channel_args = c->args; - grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "state_change"); grpc_connector_connect(c->connector, &args, &c->connecting_result, @@ -416,7 +435,7 @@ static void on_alarm(void* arg, grpc_error* error) { } if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - c->backoff_result = grpc_backoff_step(&c->backoff_state); + c->next_attempt_deadline = c->backoff->Step(); continue_connect_locked(c); gpr_mu_unlock(&c->mu); } else { @@ -452,22 +471,20 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { if (!c->backoff_begun) { c->backoff_begun = true; - c->backoff_result = grpc_backoff_begin(&c->backoff_state); + c->next_attempt_deadline = c->backoff->Begin(); continue_connect_locked(c); } else { GPR_ASSERT(!c->have_alarm); c->have_alarm = true; const grpc_millis time_til_next = - c->backoff_result.next_attempt_start_time - - grpc_core::ExecCtx::Get()->Now(); + c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now(); if (time_til_next <= 0) { gpr_log(GPR_INFO, "Retry immediately"); } else { gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next); } GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); - grpc_timer_init(&c->alarm, c->backoff_result.next_attempt_start_time, - &c->on_alarm); + grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm); } } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index c13c4056c1..5ee1f08e61 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -79,7 +79,9 @@ static int g_default_server_keepalive_time_ms = DEFAULT_SERVER_KEEPALIVE_TIME_MS; static int g_default_server_keepalive_timeout_ms = DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS; -static bool g_default_keepalive_permit_without_calls = +static bool g_default_client_keepalive_permit_without_calls = + DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; +static bool g_default_server_keepalive_permit_without_calls = DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; static int g_default_min_sent_ping_interval_without_data_ms = @@ -346,6 +348,8 @@ static void init_transport(grpc_chttp2_transport* t, t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX ? GRPC_MILLIS_INF_FUTURE : g_default_client_keepalive_timeout_ms; + t->keepalive_permit_without_calls = + g_default_client_keepalive_permit_without_calls; } else { t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX ? GRPC_MILLIS_INF_FUTURE @@ -353,8 +357,9 @@ static void init_transport(grpc_chttp2_transport* t, t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX ? GRPC_MILLIS_INF_FUTURE : g_default_server_keepalive_timeout_ms; + t->keepalive_permit_without_calls = + g_default_server_keepalive_permit_without_calls; } - t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls; t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; @@ -2550,7 +2555,9 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, for (i = 0; i < args->num_args; i++) { if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) { const int value = grpc_channel_arg_get_integer( - &args->args[i], {g_default_client_keepalive_time_ms, 1, INT_MAX}); + &args->args[i], {is_client ? g_default_client_keepalive_time_ms + : g_default_server_keepalive_time_ms, + 1, INT_MAX}); if (is_client) { g_default_client_keepalive_time_ms = value; } else { @@ -2559,8 +2566,9 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, } else if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { const int value = grpc_channel_arg_get_integer( - &args->args[i], - {g_default_client_keepalive_timeout_ms, 0, INT_MAX}); + &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms + : g_default_server_keepalive_timeout_ms, + 0, INT_MAX}); if (is_client) { g_default_client_keepalive_timeout_ms = value; } else { @@ -2568,10 +2576,16 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, } } else if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { - g_default_keepalive_permit_without_calls = - (uint32_t)grpc_channel_arg_get_integer( - &args->args[i], - {g_default_keepalive_permit_without_calls, 0, 1}); + const bool value = (uint32_t)grpc_channel_arg_get_integer( + &args->args[i], + {is_client ? g_default_client_keepalive_permit_without_calls + : g_default_server_keepalive_timeout_ms, + 0, 1}); + if (is_client) { + g_default_client_keepalive_permit_without_calls = value; + } else { + g_default_server_keepalive_permit_without_calls = value; + } } else if (0 == strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) { g_default_max_ping_strikes = grpc_channel_arg_get_integer( diff --git a/src/core/lib/backoff/backoff.cc b/src/core/lib/backoff/backoff.cc index da3b9b1b2d..41f625a636 100644 --- a/src/core/lib/backoff/backoff.cc +++ b/src/core/lib/backoff/backoff.cc @@ -18,61 +18,53 @@ #include "src/core/lib/backoff/backoff.h" +#include <algorithm> + #include <grpc/support/useful.h> -void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff, - double multiplier, double jitter, - grpc_millis min_connect_timeout, - grpc_millis max_backoff) { - backoff->initial_backoff = initial_backoff; - backoff->multiplier = multiplier; - backoff->jitter = jitter; - backoff->min_connect_timeout = min_connect_timeout; - backoff->max_backoff = max_backoff; - backoff->rng_state = (uint32_t)gpr_now(GPR_CLOCK_REALTIME).tv_nsec; -} +namespace grpc_core { -grpc_backoff_result grpc_backoff_begin(grpc_backoff* backoff) { - backoff->current_backoff = backoff->initial_backoff; - const grpc_millis initial_timeout = - GPR_MAX(backoff->initial_backoff, backoff->min_connect_timeout); - const grpc_millis now = grpc_core::ExecCtx::Get()->Now(); - const grpc_backoff_result result = {now + initial_timeout, - now + backoff->current_backoff}; - return result; -} +namespace { -/* Generate a random number between 0 and 1. */ -static double generate_uniform_random_number(uint32_t* rng_state) { - *rng_state = (1103515245 * *rng_state + 12345) % ((uint32_t)1 << 31); - return *rng_state / (double)((uint32_t)1 << 31); +/* Generate a random number between 0 and 1. We roll our own RNG because seeding + * rand() modifies a global variable we have no control over. */ +double generate_uniform_random_number(uint32_t* rng_state) { + constexpr uint32_t two_raise_31 = uint32_t(1) << 31; + *rng_state = (1103515245 * *rng_state + 12345) % two_raise_31; + return *rng_state / static_cast<double>(two_raise_31); } -static double generate_uniform_random_number_between(uint32_t* rng_state, - double a, double b) { +double generate_uniform_random_number_between(uint32_t* rng_state, double a, + double b) { if (a == b) return a; if (a > b) GPR_SWAP(double, a, b); // make sure a < b const double range = b - a; return a + generate_uniform_random_number(rng_state) * range; } +} // namespace -grpc_backoff_result grpc_backoff_step(grpc_backoff* backoff) { - backoff->current_backoff = (grpc_millis)(GPR_MIN( - backoff->current_backoff * backoff->multiplier, backoff->max_backoff)); - const double jitter = generate_uniform_random_number_between( - &backoff->rng_state, -backoff->jitter * backoff->current_backoff, - backoff->jitter * backoff->current_backoff); - const grpc_millis current_timeout = - GPR_MAX((grpc_millis)(backoff->current_backoff + jitter), - backoff->min_connect_timeout); - const grpc_millis next_timeout = GPR_MIN( - (grpc_millis)(backoff->current_backoff + jitter), backoff->max_backoff); - const grpc_millis now = grpc_core::ExecCtx::Get()->Now(); - const grpc_backoff_result result = {now + current_timeout, - now + next_timeout}; - return result; +BackOff::BackOff(const Options& options) : options_(options) { + rng_state_ = static_cast<uint32_t>(gpr_now(GPR_CLOCK_REALTIME).tv_nsec); +} + +grpc_millis BackOff::Begin() { + current_backoff_ = options_.initial_backoff(); + return current_backoff_ + grpc_core::ExecCtx::Get()->Now(); } -void grpc_backoff_reset(grpc_backoff* backoff) { - backoff->current_backoff = backoff->initial_backoff; +grpc_millis BackOff::Step() { + current_backoff_ = + (grpc_millis)(std::min(current_backoff_ * options_.multiplier(), + (double)options_.max_backoff())); + const double jitter = generate_uniform_random_number_between( + &rng_state_, -options_.jitter() * current_backoff_, + options_.jitter() * current_backoff_); + const grpc_millis next_timeout = (grpc_millis)(current_backoff_ + jitter); + return next_timeout + grpc_core::ExecCtx::Get()->Now(); } + +void BackOff::Reset() { current_backoff_ = options_.initial_backoff(); } + +void BackOff::SetRandomSeed(uint32_t seed) { rng_state_ = seed; } + +} // namespace grpc_core diff --git a/src/core/lib/backoff/backoff.h b/src/core/lib/backoff/backoff.h index f61d14ec95..84ef9b82e4 100644 --- a/src/core/lib/backoff/backoff.h +++ b/src/core/lib/backoff/backoff.h @@ -21,53 +21,69 @@ #include "src/core/lib/iomgr/exec_ctx.h" -typedef struct { - /// const: how long to wait after the first failure before retrying - grpc_millis initial_backoff; - - /// const: factor with which to multiply backoff after a failed retry - double multiplier; - - /// const: amount to randomize backoffs - double jitter; - - /// const: minimum time between retries - grpc_millis min_connect_timeout; - - /// const: maximum time between retries - grpc_millis max_backoff; - +namespace grpc_core { + +/// Implementation of the backoff mechanism described in +/// doc/connection-backoff.md +class BackOff { + public: + class Options; + + /// Initialize backoff machinery - does not need to be destroyed + explicit BackOff(const Options& options); + + /// Begin retry loop: returns the deadline to be used for the next attempt, + /// following the backoff strategy. + grpc_millis Begin(); + /// Step a retry loop: returns the deadline to be used for the next attempt, + /// following the backoff strategy. + grpc_millis Step(); + /// Reset the backoff, so the next grpc_backoff_step will be a + /// grpc_backoff_begin. + void Reset(); + + void SetRandomSeed(unsigned int seed); + + class Options { + public: + Options& set_initial_backoff(grpc_millis initial_backoff) { + initial_backoff_ = initial_backoff; + return *this; + } + Options& set_multiplier(double multiplier) { + multiplier_ = multiplier; + return *this; + } + Options& set_jitter(double jitter) { + jitter_ = jitter; + return *this; + } + Options& set_max_backoff(grpc_millis max_backoff) { + max_backoff_ = max_backoff; + return *this; + } + /// how long to wait after the first failure before retrying + grpc_millis initial_backoff() const { return initial_backoff_; } + /// factor with which to multiply backoff after a failed retry + double multiplier() const { return multiplier_; } + /// amount to randomize backoffs + double jitter() const { return jitter_; } + /// maximum time between retries + grpc_millis max_backoff() const { return max_backoff_; } + + private: + grpc_millis initial_backoff_; + double multiplier_; + double jitter_; + grpc_millis max_backoff_; + }; // class Options + + private: + const Options options_; /// current delay before retries - grpc_millis current_backoff; - - /// random number generator - uint32_t rng_state; -} grpc_backoff; - -typedef struct { - /// Deadline to be used for the current attempt. - grpc_millis current_deadline; - - /// Deadline to be used for the next attempt, following the backoff strategy. - grpc_millis next_attempt_start_time; -} grpc_backoff_result; - -/// Initialize backoff machinery - does not need to be destroyed -void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff, - double multiplier, double jitter, - grpc_millis min_connect_timeout, - grpc_millis max_backoff); - -/// Begin retry loop: returns the deadlines to be used for the current attempt -/// and the subsequent retry, if any. -grpc_backoff_result grpc_backoff_begin(grpc_backoff* backoff); - -/// Step a retry loop: returns the deadlines to be used for the current attempt -/// and the subsequent retry, if any. -grpc_backoff_result grpc_backoff_step(grpc_backoff* backoff); - -/// Reset the backoff, so the next grpc_backoff_step will be a -/// grpc_backoff_begin. -void grpc_backoff_reset(grpc_backoff* backoff); + grpc_millis current_backoff_; + uint32_t rng_state_; +}; +} // namespace grpc_core #endif /* GRPC_CORE_LIB_BACKOFF_BACKOFF_H */ diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 42cd7c455d..67c3caf5ee 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -749,7 +749,7 @@ const char* grpc_error_string(grpc_error* err) { if (!gpr_atm_rel_cas(&err->atomics.error_string, 0, (gpr_atm)out)) { gpr_free(out); - out = (char*)gpr_atm_no_barrier_load(&err->atomics.error_string); + out = (char*)gpr_atm_acq_load(&err->atomics.error_string); } GPR_TIMER_END("grpc_error_string", 0); diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 4759ee0791..8c72a439f6 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -165,6 +165,8 @@ void grpc_error_unref(grpc_error* err); grpc_error* grpc_error_set_int(grpc_error* src, grpc_error_ints which, intptr_t value) GRPC_MUST_USE_RESULT; bool grpc_error_get_int(grpc_error* error, grpc_error_ints which, intptr_t* p); +/// This call takes ownership of the slice; the error is responsible for +/// eventually unref-ing it. grpc_error* grpc_error_set_str(grpc_error* src, grpc_error_strs which, grpc_slice str) GRPC_MUST_USE_RESULT; /// Returns false if the specified string is not set. @@ -174,7 +176,8 @@ bool grpc_error_get_str(grpc_error* error, grpc_error_strs which, /// Add a child error: an error that is believed to have contributed to this /// error occurring. Allows root causing high level errors from lower level -/// errors that contributed to them. +/// errors that contributed to them. The src error takes ownership of the +/// child error. grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) GRPC_MUST_USE_RESULT; grpc_error* grpc_os_error(const char* file, int line, int err, diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index ae9d47ece5..1ab7e516de 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -1232,8 +1232,6 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return * NULL */ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { - gpr_log(GPR_ERROR, - "Skipping epoll1 becuase GRPC_LINUX_EPOLL is not defined."); return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index b2817156a8..5f5f45a7a5 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -1449,8 +1449,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux( * NULL */ const grpc_event_engine_vtable* grpc_init_epollex_linux( bool explicitly_requested) { - gpr_log(GPR_ERROR, - "Skipping epollex becuase GRPC_LINUX_EPOLL is not defined."); return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index 7a8962f4a8..8072a6cbed 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -1732,8 +1732,6 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux( * NULL */ const grpc_event_engine_vtable* grpc_init_epollsig_linux( bool explicit_request) { - gpr_log(GPR_ERROR, - "Skipping epollsig becuase GRPC_LINUX_EPOLL is not defined."); return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 53de94fb6e..7ea1dfaa80 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -71,6 +71,7 @@ struct grpc_fd { int shutdown; int closed; int released; + gpr_atm pollhup; grpc_error* shutdown_error; /* The watcher list. @@ -242,7 +243,7 @@ struct grpc_pollset_set { typedef struct poll_result { gpr_refcount refcount; - cv_node* watchers; + grpc_cv_node* watchers; int watchcount; struct pollfd* fds; nfds_t nfds; @@ -273,7 +274,7 @@ typedef struct poll_hash_table { } poll_hash_table; poll_hash_table poll_cache; -cv_fd_table g_cvfds; +grpc_cv_fd_table g_cvfds; /******************************************************************************* * fd_posix.c @@ -335,6 +336,7 @@ static grpc_fd* fd_create(int fd, const char* name) { r->on_done_closure = nullptr; r->closed = 0; r->released = 0; + gpr_atm_no_barrier_store(&r->pollhup, 0); r->read_notifier_pollset = nullptr; char* name2; @@ -950,7 +952,8 @@ static grpc_error* pollset_work(grpc_pollset* pollset, pfds[0].events = POLLIN; pfds[0].revents = 0; for (i = 0; i < pollset->fd_count; i++) { - if (fd_is_orphaned(pollset->fds[i])) { + if (fd_is_orphaned(pollset->fds[i]) || + gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } else { pollset->fds[fd_count++] = pollset->fds[i]; @@ -1017,6 +1020,12 @@ static grpc_error* pollset_work(grpc_pollset* pollset, pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0, (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents); } + /* This is a mitigation to prevent poll() from spinning on a + ** POLLHUP https://github.com/grpc/grpc/pull/13665 + */ + if (pfds[i].revents & POLLHUP) { + gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1); + } fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK, pfds[i].revents & POLLOUT_CHECK, pollset); } @@ -1435,7 +1444,7 @@ static void decref_poll_result(poll_result* res) { } } -void remove_cvn(cv_node** head, cv_node* target) { +void remove_cvn(grpc_cv_node** head, grpc_cv_node* target) { if (target->next) { target->next->prev = target->prev; } @@ -1460,7 +1469,7 @@ static void run_poll(void* args) { result->completed = 1; result->retval = retval; result->err = errno; - cv_node* watcher = result->watchers; + grpc_cv_node* watcher = result->watchers; while (watcher) { gpr_cv_signal(watcher->cv); watcher = watcher->next; @@ -1494,17 +1503,17 @@ static void run_poll(void* args) { static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) { unsigned int i; int res, idx; - cv_node* pollcv; + grpc_cv_node* pollcv; int skip_poll = 0; nfds_t nsockfds = 0; poll_result* result = nullptr; gpr_mu_lock(&g_cvfds.mu); - pollcv = (cv_node*)gpr_malloc(sizeof(cv_node)); + pollcv = (grpc_cv_node*)gpr_malloc(sizeof(grpc_cv_node)); pollcv->next = nullptr; gpr_cv pollcv_cv; gpr_cv_init(&pollcv_cv); pollcv->cv = &pollcv_cv; - cv_node* fd_cvs = (cv_node*)gpr_malloc(nfds * sizeof(cv_node)); + grpc_cv_node* fd_cvs = (grpc_cv_node*)gpr_malloc(nfds * sizeof(grpc_cv_node)); for (i = 0; i < nfds; i++) { fds[i].revents = 0; @@ -1600,7 +1609,8 @@ static void global_cv_fd_table_init() { gpr_cv_init(&g_cvfds.shutdown_cv); gpr_ref_init(&g_cvfds.pollcount, 1); g_cvfds.size = CV_DEFAULT_TABLE_SIZE; - g_cvfds.cvfds = (fd_node*)gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE); + g_cvfds.cvfds = + (grpc_fd_node*)gpr_malloc(sizeof(grpc_fd_node) * CV_DEFAULT_TABLE_SIZE); g_cvfds.free_fds = nullptr; thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN); for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) { diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 24ccab14b2..8cd5f8d618 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -212,6 +212,9 @@ finish: fd = nullptr; } done = (--ac->refs == 0); + // Create a copy of the data from "ac" to be accessed after the unlock, as + // "ac" and its contents may be deallocated by the time they are read. + const grpc_slice addr_str_slice = grpc_slice_from_copied_string(ac->addr_str); gpr_mu_unlock(&ac->mu); if (error != GRPC_ERROR_NONE) { char* error_descr; @@ -225,9 +228,13 @@ finish: gpr_free(error_descr); gpr_free(desc); error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, - grpc_slice_from_copied_string(ac->addr_str)); + addr_str_slice /* takes ownership */); + } else { + grpc_slice_unref(addr_str_slice); } if (done) { + // This is safe even outside the lock, because "done", the sentinel, is + // populated *inside* the lock. gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_str); grpc_channel_args_destroy(ac->channel_args); diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 55e0b165ec..4a97f3353d 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -72,6 +72,7 @@ struct grpc_udp_listener { grpc_udp_server_read_cb read_cb; grpc_udp_server_write_cb write_cb; grpc_udp_server_orphan_cb orphan_cb; + grpc_udp_server_start_cb start_cb; // To be scheduled on another thread to actually read/write. grpc_closure do_read_closure; grpc_closure do_write_closure; @@ -353,7 +354,7 @@ static void do_read(void* arg, grpc_error* error) { * read lock if available. */ gpr_mu_lock(&sp->server->mu); /* Tell the registered callback that data is available to read. */ - if (!sp->already_shutdown && sp->read_cb(sp->emfd, sp->server->user_data)) { + if (!sp->already_shutdown && sp->read_cb(sp->emfd)) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); @@ -383,7 +384,7 @@ static void on_read(void* arg, grpc_error* error) { /* Read once. If there is more data to read, off load the work to another * thread to finish. */ GPR_ASSERT(sp->read_cb); - if (sp->read_cb(sp->emfd, sp->server->user_data)) { + if (sp->read_cb(sp->emfd)) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg, @@ -411,7 +412,7 @@ void fd_notify_on_write_wrapper(void* arg, grpc_error* error) { static void do_write(void* arg, grpc_error* error) { grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); - gpr_mu_lock(&(sp->server->mu)); + gpr_mu_lock(&sp->server->mu); if (sp->already_shutdown) { // If fd has been shutdown, don't write any more and re-arm notification. grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); @@ -429,7 +430,7 @@ static void do_write(void* arg, grpc_error* error) { static void on_write(void* arg, grpc_error* error) { grpc_udp_listener* sp = (grpc_udp_listener*)arg; - gpr_mu_lock(&(sp->server->mu)); + gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); @@ -450,6 +451,7 @@ static void on_write(void* arg, grpc_error* error) { static int add_socket_to_server(grpc_udp_server* s, int fd, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { @@ -480,6 +482,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, sp->read_cb = read_cb; sp->write_cb = write_cb; sp->orphan_cb = orphan_cb; + sp->start_cb = start_cb; sp->orphan_notified = false; sp->already_shutdown = false; GPR_ASSERT(sp->emfd); @@ -492,6 +495,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { @@ -541,8 +545,8 @@ int grpc_udp_server_add_port(grpc_udp_server* s, // TODO(rjshade): Test and propagate the returned grpc_error*: GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); - allocated_port1 = - add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); + allocated_port1 = add_socket_to_server(s, fd, addr, start_cb, read_cb, + write_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -565,7 +569,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, addr = &addr4_copy; } allocated_port2 = - add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); + add_socket_to_server(s, fd, addr, start_cb, read_cb, write_cb, orphan_cb); done: gpr_free(allocated_addr); @@ -587,6 +591,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) { void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, size_t pollset_count, void* user_data) { + gpr_log(GPR_DEBUG, "grpc_udp_server_start"); size_t i; gpr_mu_lock(&s->mu); grpc_udp_listener* sp; @@ -596,6 +601,7 @@ void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, sp = s->head; while (sp != nullptr) { + sp->start_cb(sp->emfd, sp->server->user_data); for (i = 0; i < pollset_count; i++) { grpc_pollset_add_fd(pollsets[i], sp->emfd); } diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 02e3acb7f5..a469ab9be5 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -30,9 +30,12 @@ struct grpc_server; /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; +/* Called when grpc server starts to listening on the grpc_fd. */ +typedef void (*grpc_udp_server_start_cb)(grpc_fd* emfd, void* user_data); + /* Called when data is available to read from the socket. * Return true if there is more data to read from fd. */ -typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd, void* user_data); +typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd); /* Called when the socket is writeable. The given closure should be scheduled * when the socket becomes blocked next time. */ @@ -65,6 +68,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index); all of the multiple socket port matching logic in one place */ int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb); diff --git a/src/core/lib/iomgr/wakeup_fd_cv.cc b/src/core/lib/iomgr/wakeup_fd_cv.cc index 5c1f16d3fc..c785114212 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.cc +++ b/src/core/lib/iomgr/wakeup_fd_cv.cc @@ -34,7 +34,7 @@ #define MAX_TABLE_RESIZE 256 -extern cv_fd_table g_cvfds; +extern grpc_cv_fd_table g_cvfds; static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { unsigned int i, newsize; @@ -42,8 +42,8 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { gpr_mu_lock(&g_cvfds.mu); if (!g_cvfds.free_fds) { newsize = GPR_MIN(g_cvfds.size * 2, g_cvfds.size + MAX_TABLE_RESIZE); - g_cvfds.cvfds = - (fd_node*)gpr_realloc(g_cvfds.cvfds, sizeof(fd_node) * newsize); + g_cvfds.cvfds = (grpc_fd_node*)gpr_realloc(g_cvfds.cvfds, + sizeof(grpc_fd_node) * newsize); for (i = g_cvfds.size; i < newsize; i++) { g_cvfds.cvfds[i].is_set = 0; g_cvfds.cvfds[i].cvs = nullptr; @@ -64,7 +64,7 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { } static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) { - cv_node* cvn; + grpc_cv_node* cvn; gpr_mu_lock(&g_cvfds.mu); g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 1; cvn = g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs; diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h index 017e41bfa8..399620af76 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.h +++ b/src/core/lib/iomgr/wakeup_fd_cv.h @@ -40,27 +40,27 @@ #define GRPC_FD_TO_IDX(fd) (-(fd)-1) #define GRPC_IDX_TO_FD(idx) (-(idx)-1) -typedef struct cv_node { +typedef struct grpc_cv_node { gpr_cv* cv; - struct cv_node* next; - struct cv_node* prev; -} cv_node; + struct grpc_cv_node* next; + struct grpc_cv_node* prev; +} grpc_cv_node; -typedef struct fd_node { +typedef struct grpc_fd_node { int is_set; - cv_node* cvs; - struct fd_node* next_free; -} fd_node; + grpc_cv_node* cvs; + struct grpc_fd_node* next_free; +} grpc_fd_node; -typedef struct cv_fd_table { +typedef struct grpc_cv_fd_table { gpr_mu mu; gpr_refcount pollcount; gpr_cv shutdown_cv; - fd_node* cvfds; - fd_node* free_fds; + grpc_fd_node* cvfds; + grpc_fd_node* free_fds; unsigned int size; grpc_poll_function_type poll; -} cv_fd_table; +} grpc_cv_fd_table; extern const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable; diff --git a/src/core/lib/support/debug_location.h b/src/core/lib/support/debug_location.h index 0939da595d..9b3f9220fc 100644 --- a/src/core/lib/support/debug_location.h +++ b/src/core/lib/support/debug_location.h @@ -36,7 +36,7 @@ class DebugLocation { const char* file_; const int line_; }; -#define DEBUG_LOCATION DebugLocation(__FILE__, __LINE__) +#define DEBUG_LOCATION ::grpc_core::DebugLocation(__FILE__, __LINE__) #else class DebugLocation { public: @@ -44,7 +44,7 @@ class DebugLocation { const char* file() const { return nullptr; } int line() const { return -1; } }; -#define DEBUG_LOCATION DebugLocation() +#define DEBUG_LOCATION ::grpc_core::DebugLocation() #endif } // namespace grpc_core |