aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2018-01-02 11:44:03 -0800
committerGravatar ncteisen <ncteisen@gmail.com>2018-01-02 11:44:03 -0800
commit24902641d434005bee6641e62fb2b57fc0b6bd87 (patch)
treed9ab96b6531e30ca28044d571f2941f05cad52cb /src/core
parentd230ad086a894cc963235b27814e19bf686eb7aa (diff)
parent63392f682e21543099926251b642cdcd0be2a17f (diff)
Merge branch 'master' of https://github.com/grpc/grpc into flow-control-part4
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc25
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc23
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc23
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc117
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc32
-rw-r--r--src/core/lib/backoff/backoff.cc78
-rw-r--r--src/core/lib/backoff/backoff.h110
-rw-r--r--src/core/lib/iomgr/error.cc2
-rw-r--r--src/core/lib/iomgr/error.h5
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc28
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.cc9
-rw-r--r--src/core/lib/iomgr/udp_server.cc20
-rw-r--r--src/core/lib/iomgr/udp_server.h6
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.cc8
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.h24
-rw-r--r--src/core/lib/support/debug_location.h4
19 files changed, 293 insertions, 227 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 3c64213fb9..dd6fc602ab 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -113,13 +113,13 @@
#include "src/core/lib/slice/slice_hash_table.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/static_metadata.h"
-#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20
#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
@@ -408,7 +408,7 @@ typedef struct glb_lb_policy {
grpc_slice lb_call_status_details;
/** LB call retry backoff state */
- grpc_backoff lb_call_backoff_state;
+ grpc_core::ManualConstructor<grpc_core::BackOff> lb_call_backoff;
/** LB call retry timer */
grpc_timer lb_call_retry_timer;
@@ -1167,7 +1167,7 @@ static void start_picking_locked(glb_lb_policy* glb_policy) {
}
glb_policy->started_picking = true;
- grpc_backoff_reset(&glb_policy->lb_call_backoff_state);
+ glb_policy->lb_call_backoff->Reset();
query_for_backends_locked(glb_policy);
}
@@ -1302,8 +1302,7 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
glb_policy->updating_lb_call = false;
} else if (!glb_policy->shutting_down) {
/* if we aren't shutting down, restart the LB client call after some time */
- grpc_millis next_try = grpc_backoff_step(&glb_policy->lb_call_backoff_state)
- .next_attempt_start_time;
+ grpc_millis next_try = glb_policy->lb_call_backoff->Step();
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
glb_policy);
@@ -1463,12 +1462,14 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) {
lb_on_response_received_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
- grpc_backoff_init(&glb_policy->lb_call_backoff_state,
- GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
- GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER,
- GRPC_GRPCLB_RECONNECT_JITTER,
- GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
- GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ grpc_core::BackOff::Options backoff_options;
+ backoff_options
+ .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
+ .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
+ .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+
+ glb_policy->lb_call_backoff.Init(backoff_options);
glb_policy->seen_initial_response = false;
glb_policy->last_client_load_report_counters_were_zero = false;
@@ -1572,7 +1573,7 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
memset(ops, 0, sizeof(ops));
grpc_op* op = ops;
if (glb_policy->lb_response_payload != nullptr) {
- grpc_backoff_reset(&glb_policy->lb_call_backoff_state);
+ glb_policy->lb_call_backoff->Reset();
/* Received data from the LB server. Look inside
* glb_policy->lb_response_payload, for a serverlist. */
grpc_byte_buffer_reader bbr;
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index 4ec4477c82..4659a5f3ed 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -40,10 +40,10 @@
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/support/env.h"
+#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/service_config.h"
-#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1
#define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120
@@ -89,7 +89,7 @@ typedef struct {
bool have_retry_timer;
grpc_timer retry_timer;
/** retry backoff state */
- grpc_backoff backoff_state;
+ grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
/** currently resolving addresses */
grpc_lb_addresses* lb_addresses;
@@ -131,7 +131,7 @@ static void dns_ares_shutdown_locked(grpc_resolver* resolver) {
static void dns_ares_channel_saw_error_locked(grpc_resolver* resolver) {
ares_dns_resolver* r = (ares_dns_resolver*)resolver;
if (!r->resolving) {
- grpc_backoff_reset(&r->backoff_state);
+ r->backoff->Reset();
dns_ares_start_resolving_locked(r);
}
}
@@ -264,8 +264,7 @@ static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) {
} else {
const char* msg = grpc_error_string(error);
gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg);
- grpc_millis next_try =
- grpc_backoff_step(&r->backoff_state).next_attempt_start_time;
+ grpc_millis next_try = r->backoff->Step();
grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
grpc_error_string(error));
@@ -298,7 +297,7 @@ static void dns_ares_next_locked(grpc_resolver* resolver,
r->next_completion = on_complete;
r->target_result = target_result;
if (r->resolved_version == 0 && !r->resolving) {
- grpc_backoff_reset(&r->backoff_state);
+ r->backoff->Reset();
dns_ares_start_resolving_locked(r);
} else {
dns_ares_maybe_finish_next_locked(r);
@@ -368,11 +367,13 @@ static grpc_resolver* dns_ares_create(grpc_resolver_args* args,
if (args->pollset_set != nullptr) {
grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set);
}
- grpc_backoff_init(
- &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
- GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER,
- GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
- GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ grpc_core::BackOff::Options backoff_options;
+ backoff_options
+ .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
+ .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(GRPC_DNS_RECONNECT_JITTER)
+ .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ r->backoff.Init(grpc_core::BackOff(backoff_options));
GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked,
dns_ares_on_retry_timer_locked, r,
grpc_combiner_scheduler(r->base.combiner));
diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
index 77698e97aa..1c2cfc08e7 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
@@ -33,9 +33,9 @@
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/env.h"
+#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/support/string.h"
-#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1
#define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120
@@ -70,7 +70,7 @@ typedef struct {
grpc_timer retry_timer;
grpc_closure on_retry;
/** retry backoff state */
- grpc_backoff backoff_state;
+ grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
/** currently resolving addresses */
grpc_resolved_addresses* addresses;
@@ -106,7 +106,7 @@ static void dns_shutdown_locked(grpc_resolver* resolver) {
static void dns_channel_saw_error_locked(grpc_resolver* resolver) {
dns_resolver* r = (dns_resolver*)resolver;
if (!r->resolving) {
- grpc_backoff_reset(&r->backoff_state);
+ r->backoff->Reset();
dns_start_resolving_locked(r);
}
}
@@ -119,7 +119,7 @@ static void dns_next_locked(grpc_resolver* resolver,
r->next_completion = on_complete;
r->target_result = target_result;
if (r->resolved_version == 0 && !r->resolving) {
- grpc_backoff_reset(&r->backoff_state);
+ r->backoff->Reset();
dns_start_resolving_locked(r);
} else {
dns_maybe_finish_next_locked(r);
@@ -161,8 +161,7 @@ static void dns_on_resolved_locked(void* arg, grpc_error* error) {
grpc_resolved_addresses_destroy(r->addresses);
grpc_lb_addresses_destroy(addresses);
} else {
- grpc_millis next_try =
- grpc_backoff_step(&r->backoff_state).next_attempt_start_time;
+ grpc_millis next_try = r->backoff->Step();
grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
grpc_error_string(error));
@@ -244,11 +243,13 @@ static grpc_resolver* dns_create(grpc_resolver_args* args,
if (args->pollset_set != nullptr) {
grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set);
}
- grpc_backoff_init(
- &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
- GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER,
- GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
- GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ grpc_core::BackOff::Options backoff_options;
+ backoff_options
+ .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
+ .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(GRPC_DNS_RECONNECT_JITTER)
+ .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ r->backoff.Init(grpc_core::BackOff(backoff_options));
return &r->base;
}
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 84a5ace31d..f07394d29b 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -20,7 +20,9 @@
#include <inttypes.h>
#include <limits.h>
-#include <string.h>
+
+#include <algorithm>
+#include <cstring>
#include <grpc/support/alloc.h>
#include <grpc/support/avl.h>
@@ -39,6 +41,7 @@
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -48,7 +51,7 @@
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
-#define GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS 20
+#define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
@@ -118,8 +121,9 @@ struct grpc_subchannel {
external_state_watcher root_external_state_watcher;
/** backoff state */
- grpc_backoff backoff_state;
- grpc_backoff_result backoff_result;
+ grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
+ grpc_millis next_attempt_deadline;
+ grpc_millis min_connect_timeout_ms;
/** do we have an active alarm? */
bool have_alarm;
@@ -274,6 +278,54 @@ void grpc_subchannel_weak_unref(
}
}
+static void parse_args_for_backoff_values(
+ const grpc_channel_args* args, grpc_core::BackOff::Options* backoff_options,
+ grpc_millis* min_connect_timeout_ms) {
+ grpc_millis initial_backoff_ms =
+ GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
+ *min_connect_timeout_ms =
+ GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000;
+ grpc_millis max_backoff_ms =
+ GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
+ bool fixed_reconnect_backoff = false;
+ if (args != nullptr) {
+ for (size_t i = 0; i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key,
+ "grpc.testing.fixed_reconnect_backoff_ms")) {
+ fixed_reconnect_backoff = true;
+ initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms =
+ grpc_channel_arg_get_integer(
+ &args->args[i],
+ {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
+ } else if (0 ==
+ strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ *min_connect_timeout_ms = grpc_channel_arg_get_integer(
+ &args->args[i],
+ {static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX});
+ } else if (0 ==
+ strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ max_backoff_ms = grpc_channel_arg_get_integer(
+ &args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX});
+ } else if (0 == strcmp(args->args[i].key,
+ GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ initial_backoff_ms = grpc_channel_arg_get_integer(
+ &args->args[i],
+ {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
+ }
+ }
+ }
+ backoff_options->set_initial_backoff(initial_backoff_ms)
+ .set_multiplier(fixed_reconnect_backoff
+ ? 1.0
+ : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(fixed_reconnect_backoff ? 0.0
+ : GRPC_SUBCHANNEL_RECONNECT_JITTER)
+ .set_max_backoff(max_backoff_ms);
+}
+
grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
const grpc_subchannel_args* args) {
grpc_subchannel_key* key = grpc_subchannel_key_create(args);
@@ -324,43 +376,10 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
grpc_schedule_on_exec_ctx);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
- int initial_backoff_ms =
- GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
- int min_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000;
- int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
- bool fixed_reconnect_backoff = false;
- if (c->args) {
- for (size_t i = 0; i < c->args->num_args; i++) {
- if (0 == strcmp(c->args->args[i].key,
- "grpc.testing.fixed_reconnect_backoff_ms")) {
- fixed_reconnect_backoff = true;
- initial_backoff_ms = min_backoff_ms = max_backoff_ms =
- grpc_channel_arg_get_integer(&c->args->args[i],
- {initial_backoff_ms, 100, INT_MAX});
- } else if (0 == strcmp(c->args->args[i].key,
- GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
- fixed_reconnect_backoff = false;
- min_backoff_ms = grpc_channel_arg_get_integer(
- &c->args->args[i], {min_backoff_ms, 100, INT_MAX});
- } else if (0 == strcmp(c->args->args[i].key,
- GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
- fixed_reconnect_backoff = false;
- max_backoff_ms = grpc_channel_arg_get_integer(
- &c->args->args[i], {max_backoff_ms, 100, INT_MAX});
- } else if (0 == strcmp(c->args->args[i].key,
- GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
- fixed_reconnect_backoff = false;
- initial_backoff_ms = grpc_channel_arg_get_integer(
- &c->args->args[i], {initial_backoff_ms, 100, INT_MAX});
- }
- }
- }
- grpc_backoff_init(
- &c->backoff_state, initial_backoff_ms,
- fixed_reconnect_backoff ? 1.0
- : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
- fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER,
- min_backoff_ms, max_backoff_ms);
+ grpc_core::BackOff::Options backoff_options;
+ parse_args_for_backoff_values(args->args, &backoff_options,
+ &c->min_connect_timeout_ms);
+ c->backoff.Init(backoff_options);
gpr_mu_init(&c->mu);
return grpc_subchannel_index_register(key, c);
@@ -368,11 +387,11 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
static void continue_connect_locked(grpc_subchannel* c) {
grpc_connect_in_args args;
-
args.interested_parties = c->pollset_set;
- args.deadline = c->backoff_result.current_deadline;
+ const grpc_millis min_deadline =
+ c->min_connect_timeout_ms + grpc_core::ExecCtx::Get()->Now();
+ args.deadline = std::max(c->next_attempt_deadline, min_deadline);
args.channel_args = c->args;
-
grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "state_change");
grpc_connector_connect(c->connector, &args, &c->connecting_result,
@@ -416,7 +435,7 @@ static void on_alarm(void* arg, grpc_error* error) {
}
if (error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
- c->backoff_result = grpc_backoff_step(&c->backoff_state);
+ c->next_attempt_deadline = c->backoff->Step();
continue_connect_locked(c);
gpr_mu_unlock(&c->mu);
} else {
@@ -452,22 +471,20 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
if (!c->backoff_begun) {
c->backoff_begun = true;
- c->backoff_result = grpc_backoff_begin(&c->backoff_state);
+ c->next_attempt_deadline = c->backoff->Begin();
continue_connect_locked(c);
} else {
GPR_ASSERT(!c->have_alarm);
c->have_alarm = true;
const grpc_millis time_til_next =
- c->backoff_result.next_attempt_start_time -
- grpc_core::ExecCtx::Get()->Now();
+ c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now();
if (time_til_next <= 0) {
gpr_log(GPR_INFO, "Retry immediately");
} else {
gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next);
}
GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
- grpc_timer_init(&c->alarm, c->backoff_result.next_attempt_start_time,
- &c->on_alarm);
+ grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm);
}
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index c13c4056c1..5ee1f08e61 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -79,7 +79,9 @@ static int g_default_server_keepalive_time_ms =
DEFAULT_SERVER_KEEPALIVE_TIME_MS;
static int g_default_server_keepalive_timeout_ms =
DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
-static bool g_default_keepalive_permit_without_calls =
+static bool g_default_client_keepalive_permit_without_calls =
+ DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
+static bool g_default_server_keepalive_permit_without_calls =
DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
static int g_default_min_sent_ping_interval_without_data_ms =
@@ -346,6 +348,8 @@ static void init_transport(grpc_chttp2_transport* t,
t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
? GRPC_MILLIS_INF_FUTURE
: g_default_client_keepalive_timeout_ms;
+ t->keepalive_permit_without_calls =
+ g_default_client_keepalive_permit_without_calls;
} else {
t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
? GRPC_MILLIS_INF_FUTURE
@@ -353,8 +357,9 @@ static void init_transport(grpc_chttp2_transport* t,
t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
? GRPC_MILLIS_INF_FUTURE
: g_default_server_keepalive_timeout_ms;
+ t->keepalive_permit_without_calls =
+ g_default_server_keepalive_permit_without_calls;
}
- t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls;
t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
@@ -2550,7 +2555,9 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
for (i = 0; i < args->num_args; i++) {
if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
const int value = grpc_channel_arg_get_integer(
- &args->args[i], {g_default_client_keepalive_time_ms, 1, INT_MAX});
+ &args->args[i], {is_client ? g_default_client_keepalive_time_ms
+ : g_default_server_keepalive_time_ms,
+ 1, INT_MAX});
if (is_client) {
g_default_client_keepalive_time_ms = value;
} else {
@@ -2559,8 +2566,9 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
} else if (0 ==
strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
const int value = grpc_channel_arg_get_integer(
- &args->args[i],
- {g_default_client_keepalive_timeout_ms, 0, INT_MAX});
+ &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms
+ : g_default_server_keepalive_timeout_ms,
+ 0, INT_MAX});
if (is_client) {
g_default_client_keepalive_timeout_ms = value;
} else {
@@ -2568,10 +2576,16 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
}
} else if (0 == strcmp(args->args[i].key,
GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
- g_default_keepalive_permit_without_calls =
- (uint32_t)grpc_channel_arg_get_integer(
- &args->args[i],
- {g_default_keepalive_permit_without_calls, 0, 1});
+ const bool value = (uint32_t)grpc_channel_arg_get_integer(
+ &args->args[i],
+ {is_client ? g_default_client_keepalive_permit_without_calls
+ : g_default_server_keepalive_timeout_ms,
+ 0, 1});
+ if (is_client) {
+ g_default_client_keepalive_permit_without_calls = value;
+ } else {
+ g_default_server_keepalive_permit_without_calls = value;
+ }
} else if (0 ==
strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
g_default_max_ping_strikes = grpc_channel_arg_get_integer(
diff --git a/src/core/lib/backoff/backoff.cc b/src/core/lib/backoff/backoff.cc
index da3b9b1b2d..41f625a636 100644
--- a/src/core/lib/backoff/backoff.cc
+++ b/src/core/lib/backoff/backoff.cc
@@ -18,61 +18,53 @@
#include "src/core/lib/backoff/backoff.h"
+#include <algorithm>
+
#include <grpc/support/useful.h>
-void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff,
- double multiplier, double jitter,
- grpc_millis min_connect_timeout,
- grpc_millis max_backoff) {
- backoff->initial_backoff = initial_backoff;
- backoff->multiplier = multiplier;
- backoff->jitter = jitter;
- backoff->min_connect_timeout = min_connect_timeout;
- backoff->max_backoff = max_backoff;
- backoff->rng_state = (uint32_t)gpr_now(GPR_CLOCK_REALTIME).tv_nsec;
-}
+namespace grpc_core {
-grpc_backoff_result grpc_backoff_begin(grpc_backoff* backoff) {
- backoff->current_backoff = backoff->initial_backoff;
- const grpc_millis initial_timeout =
- GPR_MAX(backoff->initial_backoff, backoff->min_connect_timeout);
- const grpc_millis now = grpc_core::ExecCtx::Get()->Now();
- const grpc_backoff_result result = {now + initial_timeout,
- now + backoff->current_backoff};
- return result;
-}
+namespace {
-/* Generate a random number between 0 and 1. */
-static double generate_uniform_random_number(uint32_t* rng_state) {
- *rng_state = (1103515245 * *rng_state + 12345) % ((uint32_t)1 << 31);
- return *rng_state / (double)((uint32_t)1 << 31);
+/* Generate a random number between 0 and 1. We roll our own RNG because seeding
+ * rand() modifies a global variable we have no control over. */
+double generate_uniform_random_number(uint32_t* rng_state) {
+ constexpr uint32_t two_raise_31 = uint32_t(1) << 31;
+ *rng_state = (1103515245 * *rng_state + 12345) % two_raise_31;
+ return *rng_state / static_cast<double>(two_raise_31);
}
-static double generate_uniform_random_number_between(uint32_t* rng_state,
- double a, double b) {
+double generate_uniform_random_number_between(uint32_t* rng_state, double a,
+ double b) {
if (a == b) return a;
if (a > b) GPR_SWAP(double, a, b); // make sure a < b
const double range = b - a;
return a + generate_uniform_random_number(rng_state) * range;
}
+} // namespace
-grpc_backoff_result grpc_backoff_step(grpc_backoff* backoff) {
- backoff->current_backoff = (grpc_millis)(GPR_MIN(
- backoff->current_backoff * backoff->multiplier, backoff->max_backoff));
- const double jitter = generate_uniform_random_number_between(
- &backoff->rng_state, -backoff->jitter * backoff->current_backoff,
- backoff->jitter * backoff->current_backoff);
- const grpc_millis current_timeout =
- GPR_MAX((grpc_millis)(backoff->current_backoff + jitter),
- backoff->min_connect_timeout);
- const grpc_millis next_timeout = GPR_MIN(
- (grpc_millis)(backoff->current_backoff + jitter), backoff->max_backoff);
- const grpc_millis now = grpc_core::ExecCtx::Get()->Now();
- const grpc_backoff_result result = {now + current_timeout,
- now + next_timeout};
- return result;
+BackOff::BackOff(const Options& options) : options_(options) {
+ rng_state_ = static_cast<uint32_t>(gpr_now(GPR_CLOCK_REALTIME).tv_nsec);
+}
+
+grpc_millis BackOff::Begin() {
+ current_backoff_ = options_.initial_backoff();
+ return current_backoff_ + grpc_core::ExecCtx::Get()->Now();
}
-void grpc_backoff_reset(grpc_backoff* backoff) {
- backoff->current_backoff = backoff->initial_backoff;
+grpc_millis BackOff::Step() {
+ current_backoff_ =
+ (grpc_millis)(std::min(current_backoff_ * options_.multiplier(),
+ (double)options_.max_backoff()));
+ const double jitter = generate_uniform_random_number_between(
+ &rng_state_, -options_.jitter() * current_backoff_,
+ options_.jitter() * current_backoff_);
+ const grpc_millis next_timeout = (grpc_millis)(current_backoff_ + jitter);
+ return next_timeout + grpc_core::ExecCtx::Get()->Now();
}
+
+void BackOff::Reset() { current_backoff_ = options_.initial_backoff(); }
+
+void BackOff::SetRandomSeed(uint32_t seed) { rng_state_ = seed; }
+
+} // namespace grpc_core
diff --git a/src/core/lib/backoff/backoff.h b/src/core/lib/backoff/backoff.h
index f61d14ec95..84ef9b82e4 100644
--- a/src/core/lib/backoff/backoff.h
+++ b/src/core/lib/backoff/backoff.h
@@ -21,53 +21,69 @@
#include "src/core/lib/iomgr/exec_ctx.h"
-typedef struct {
- /// const: how long to wait after the first failure before retrying
- grpc_millis initial_backoff;
-
- /// const: factor with which to multiply backoff after a failed retry
- double multiplier;
-
- /// const: amount to randomize backoffs
- double jitter;
-
- /// const: minimum time between retries
- grpc_millis min_connect_timeout;
-
- /// const: maximum time between retries
- grpc_millis max_backoff;
-
+namespace grpc_core {
+
+/// Implementation of the backoff mechanism described in
+/// doc/connection-backoff.md
+class BackOff {
+ public:
+ class Options;
+
+ /// Initialize backoff machinery - does not need to be destroyed
+ explicit BackOff(const Options& options);
+
+ /// Begin retry loop: returns the deadline to be used for the next attempt,
+ /// following the backoff strategy.
+ grpc_millis Begin();
+ /// Step a retry loop: returns the deadline to be used for the next attempt,
+ /// following the backoff strategy.
+ grpc_millis Step();
+ /// Reset the backoff, so the next grpc_backoff_step will be a
+ /// grpc_backoff_begin.
+ void Reset();
+
+ void SetRandomSeed(unsigned int seed);
+
+ class Options {
+ public:
+ Options& set_initial_backoff(grpc_millis initial_backoff) {
+ initial_backoff_ = initial_backoff;
+ return *this;
+ }
+ Options& set_multiplier(double multiplier) {
+ multiplier_ = multiplier;
+ return *this;
+ }
+ Options& set_jitter(double jitter) {
+ jitter_ = jitter;
+ return *this;
+ }
+ Options& set_max_backoff(grpc_millis max_backoff) {
+ max_backoff_ = max_backoff;
+ return *this;
+ }
+ /// how long to wait after the first failure before retrying
+ grpc_millis initial_backoff() const { return initial_backoff_; }
+ /// factor with which to multiply backoff after a failed retry
+ double multiplier() const { return multiplier_; }
+ /// amount to randomize backoffs
+ double jitter() const { return jitter_; }
+ /// maximum time between retries
+ grpc_millis max_backoff() const { return max_backoff_; }
+
+ private:
+ grpc_millis initial_backoff_;
+ double multiplier_;
+ double jitter_;
+ grpc_millis max_backoff_;
+ }; // class Options
+
+ private:
+ const Options options_;
/// current delay before retries
- grpc_millis current_backoff;
-
- /// random number generator
- uint32_t rng_state;
-} grpc_backoff;
-
-typedef struct {
- /// Deadline to be used for the current attempt.
- grpc_millis current_deadline;
-
- /// Deadline to be used for the next attempt, following the backoff strategy.
- grpc_millis next_attempt_start_time;
-} grpc_backoff_result;
-
-/// Initialize backoff machinery - does not need to be destroyed
-void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff,
- double multiplier, double jitter,
- grpc_millis min_connect_timeout,
- grpc_millis max_backoff);
-
-/// Begin retry loop: returns the deadlines to be used for the current attempt
-/// and the subsequent retry, if any.
-grpc_backoff_result grpc_backoff_begin(grpc_backoff* backoff);
-
-/// Step a retry loop: returns the deadlines to be used for the current attempt
-/// and the subsequent retry, if any.
-grpc_backoff_result grpc_backoff_step(grpc_backoff* backoff);
-
-/// Reset the backoff, so the next grpc_backoff_step will be a
-/// grpc_backoff_begin.
-void grpc_backoff_reset(grpc_backoff* backoff);
+ grpc_millis current_backoff_;
+ uint32_t rng_state_;
+};
+} // namespace grpc_core
#endif /* GRPC_CORE_LIB_BACKOFF_BACKOFF_H */
diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc
index 42cd7c455d..67c3caf5ee 100644
--- a/src/core/lib/iomgr/error.cc
+++ b/src/core/lib/iomgr/error.cc
@@ -749,7 +749,7 @@ const char* grpc_error_string(grpc_error* err) {
if (!gpr_atm_rel_cas(&err->atomics.error_string, 0, (gpr_atm)out)) {
gpr_free(out);
- out = (char*)gpr_atm_no_barrier_load(&err->atomics.error_string);
+ out = (char*)gpr_atm_acq_load(&err->atomics.error_string);
}
GPR_TIMER_END("grpc_error_string", 0);
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 4759ee0791..8c72a439f6 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -165,6 +165,8 @@ void grpc_error_unref(grpc_error* err);
grpc_error* grpc_error_set_int(grpc_error* src, grpc_error_ints which,
intptr_t value) GRPC_MUST_USE_RESULT;
bool grpc_error_get_int(grpc_error* error, grpc_error_ints which, intptr_t* p);
+/// This call takes ownership of the slice; the error is responsible for
+/// eventually unref-ing it.
grpc_error* grpc_error_set_str(grpc_error* src, grpc_error_strs which,
grpc_slice str) GRPC_MUST_USE_RESULT;
/// Returns false if the specified string is not set.
@@ -174,7 +176,8 @@ bool grpc_error_get_str(grpc_error* error, grpc_error_strs which,
/// Add a child error: an error that is believed to have contributed to this
/// error occurring. Allows root causing high level errors from lower level
-/// errors that contributed to them.
+/// errors that contributed to them. The src error takes ownership of the
+/// child error.
grpc_error* grpc_error_add_child(grpc_error* src,
grpc_error* child) GRPC_MUST_USE_RESULT;
grpc_error* grpc_os_error(const char* file, int line, int err,
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index ae9d47ece5..1ab7e516de 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -1232,8 +1232,6 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
* NULL */
const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
- gpr_log(GPR_ERROR,
- "Skipping epoll1 becuase GRPC_LINUX_EPOLL is not defined.");
return nullptr;
}
#endif /* defined(GRPC_POSIX_SOCKET) */
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index b2817156a8..5f5f45a7a5 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -1449,8 +1449,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux(
* NULL */
const grpc_event_engine_vtable* grpc_init_epollex_linux(
bool explicitly_requested) {
- gpr_log(GPR_ERROR,
- "Skipping epollex becuase GRPC_LINUX_EPOLL is not defined.");
return nullptr;
}
#endif /* defined(GRPC_POSIX_SOCKET) */
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index 7a8962f4a8..8072a6cbed 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -1732,8 +1732,6 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux(
* NULL */
const grpc_event_engine_vtable* grpc_init_epollsig_linux(
bool explicit_request) {
- gpr_log(GPR_ERROR,
- "Skipping epollsig becuase GRPC_LINUX_EPOLL is not defined.");
return nullptr;
}
#endif /* defined(GRPC_POSIX_SOCKET) */
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index 53de94fb6e..7ea1dfaa80 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -71,6 +71,7 @@ struct grpc_fd {
int shutdown;
int closed;
int released;
+ gpr_atm pollhup;
grpc_error* shutdown_error;
/* The watcher list.
@@ -242,7 +243,7 @@ struct grpc_pollset_set {
typedef struct poll_result {
gpr_refcount refcount;
- cv_node* watchers;
+ grpc_cv_node* watchers;
int watchcount;
struct pollfd* fds;
nfds_t nfds;
@@ -273,7 +274,7 @@ typedef struct poll_hash_table {
} poll_hash_table;
poll_hash_table poll_cache;
-cv_fd_table g_cvfds;
+grpc_cv_fd_table g_cvfds;
/*******************************************************************************
* fd_posix.c
@@ -335,6 +336,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
r->on_done_closure = nullptr;
r->closed = 0;
r->released = 0;
+ gpr_atm_no_barrier_store(&r->pollhup, 0);
r->read_notifier_pollset = nullptr;
char* name2;
@@ -950,7 +952,8 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
pfds[0].events = POLLIN;
pfds[0].revents = 0;
for (i = 0; i < pollset->fd_count; i++) {
- if (fd_is_orphaned(pollset->fds[i])) {
+ if (fd_is_orphaned(pollset->fds[i]) ||
+ gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
} else {
pollset->fds[fd_count++] = pollset->fds[i];
@@ -1017,6 +1020,12 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
(pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
}
+ /* This is a mitigation to prevent poll() from spinning on a
+ ** POLLHUP https://github.com/grpc/grpc/pull/13665
+ */
+ if (pfds[i].revents & POLLHUP) {
+ gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
+ }
fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
pfds[i].revents & POLLOUT_CHECK, pollset);
}
@@ -1435,7 +1444,7 @@ static void decref_poll_result(poll_result* res) {
}
}
-void remove_cvn(cv_node** head, cv_node* target) {
+void remove_cvn(grpc_cv_node** head, grpc_cv_node* target) {
if (target->next) {
target->next->prev = target->prev;
}
@@ -1460,7 +1469,7 @@ static void run_poll(void* args) {
result->completed = 1;
result->retval = retval;
result->err = errno;
- cv_node* watcher = result->watchers;
+ grpc_cv_node* watcher = result->watchers;
while (watcher) {
gpr_cv_signal(watcher->cv);
watcher = watcher->next;
@@ -1494,17 +1503,17 @@ static void run_poll(void* args) {
static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) {
unsigned int i;
int res, idx;
- cv_node* pollcv;
+ grpc_cv_node* pollcv;
int skip_poll = 0;
nfds_t nsockfds = 0;
poll_result* result = nullptr;
gpr_mu_lock(&g_cvfds.mu);
- pollcv = (cv_node*)gpr_malloc(sizeof(cv_node));
+ pollcv = (grpc_cv_node*)gpr_malloc(sizeof(grpc_cv_node));
pollcv->next = nullptr;
gpr_cv pollcv_cv;
gpr_cv_init(&pollcv_cv);
pollcv->cv = &pollcv_cv;
- cv_node* fd_cvs = (cv_node*)gpr_malloc(nfds * sizeof(cv_node));
+ grpc_cv_node* fd_cvs = (grpc_cv_node*)gpr_malloc(nfds * sizeof(grpc_cv_node));
for (i = 0; i < nfds; i++) {
fds[i].revents = 0;
@@ -1600,7 +1609,8 @@ static void global_cv_fd_table_init() {
gpr_cv_init(&g_cvfds.shutdown_cv);
gpr_ref_init(&g_cvfds.pollcount, 1);
g_cvfds.size = CV_DEFAULT_TABLE_SIZE;
- g_cvfds.cvfds = (fd_node*)gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE);
+ g_cvfds.cvfds =
+ (grpc_fd_node*)gpr_malloc(sizeof(grpc_fd_node) * CV_DEFAULT_TABLE_SIZE);
g_cvfds.free_fds = nullptr;
thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN);
for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) {
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 24ccab14b2..8cd5f8d618 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -212,6 +212,9 @@ finish:
fd = nullptr;
}
done = (--ac->refs == 0);
+ // Create a copy of the data from "ac" to be accessed after the unlock, as
+ // "ac" and its contents may be deallocated by the time they are read.
+ const grpc_slice addr_str_slice = grpc_slice_from_copied_string(ac->addr_str);
gpr_mu_unlock(&ac->mu);
if (error != GRPC_ERROR_NONE) {
char* error_descr;
@@ -225,9 +228,13 @@ finish:
gpr_free(error_descr);
gpr_free(desc);
error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS,
- grpc_slice_from_copied_string(ac->addr_str));
+ addr_str_slice /* takes ownership */);
+ } else {
+ grpc_slice_unref(addr_str_slice);
}
if (done) {
+ // This is safe even outside the lock, because "done", the sentinel, is
+ // populated *inside* the lock.
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
grpc_channel_args_destroy(ac->channel_args);
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index 55e0b165ec..4a97f3353d 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -72,6 +72,7 @@ struct grpc_udp_listener {
grpc_udp_server_read_cb read_cb;
grpc_udp_server_write_cb write_cb;
grpc_udp_server_orphan_cb orphan_cb;
+ grpc_udp_server_start_cb start_cb;
// To be scheduled on another thread to actually read/write.
grpc_closure do_read_closure;
grpc_closure do_write_closure;
@@ -353,7 +354,7 @@ static void do_read(void* arg, grpc_error* error) {
* read lock if available. */
gpr_mu_lock(&sp->server->mu);
/* Tell the registered callback that data is available to read. */
- if (!sp->already_shutdown && sp->read_cb(sp->emfd, sp->server->user_data)) {
+ if (!sp->already_shutdown && sp->read_cb(sp->emfd)) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE);
@@ -383,7 +384,7 @@ static void on_read(void* arg, grpc_error* error) {
/* Read once. If there is more data to read, off load the work to another
* thread to finish. */
GPR_ASSERT(sp->read_cb);
- if (sp->read_cb(sp->emfd, sp->server->user_data)) {
+ if (sp->read_cb(sp->emfd)) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg,
@@ -411,7 +412,7 @@ void fd_notify_on_write_wrapper(void* arg, grpc_error* error) {
static void do_write(void* arg, grpc_error* error) {
grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
- gpr_mu_lock(&(sp->server->mu));
+ gpr_mu_lock(&sp->server->mu);
if (sp->already_shutdown) {
// If fd has been shutdown, don't write any more and re-arm notification.
grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
@@ -429,7 +430,7 @@ static void do_write(void* arg, grpc_error* error) {
static void on_write(void* arg, grpc_error* error) {
grpc_udp_listener* sp = (grpc_udp_listener*)arg;
- gpr_mu_lock(&(sp->server->mu));
+ gpr_mu_lock(&sp->server->mu);
if (error != GRPC_ERROR_NONE) {
if (0 == --sp->server->active_ports && sp->server->shutdown) {
gpr_mu_unlock(&sp->server->mu);
@@ -450,6 +451,7 @@ static void on_write(void* arg, grpc_error* error) {
static int add_socket_to_server(grpc_udp_server* s, int fd,
const grpc_resolved_address* addr,
+ grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
@@ -480,6 +482,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
sp->read_cb = read_cb;
sp->write_cb = write_cb;
sp->orphan_cb = orphan_cb;
+ sp->start_cb = start_cb;
sp->orphan_notified = false;
sp->already_shutdown = false;
GPR_ASSERT(sp->emfd);
@@ -492,6 +495,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
+ grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
@@ -541,8 +545,8 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
// TODO(rjshade): Test and propagate the returned grpc_error*:
GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory(
s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd));
- allocated_port1 =
- add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb);
+ allocated_port1 = add_socket_to_server(s, fd, addr, start_cb, read_cb,
+ write_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
@@ -565,7 +569,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
addr = &addr4_copy;
}
allocated_port2 =
- add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb);
+ add_socket_to_server(s, fd, addr, start_cb, read_cb, write_cb, orphan_cb);
done:
gpr_free(allocated_addr);
@@ -587,6 +591,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) {
void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
size_t pollset_count, void* user_data) {
+ gpr_log(GPR_DEBUG, "grpc_udp_server_start");
size_t i;
gpr_mu_lock(&s->mu);
grpc_udp_listener* sp;
@@ -596,6 +601,7 @@ void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
sp = s->head;
while (sp != nullptr) {
+ sp->start_cb(sp->emfd, sp->server->user_data);
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(pollsets[i], sp->emfd);
}
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index 02e3acb7f5..a469ab9be5 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -30,9 +30,12 @@ struct grpc_server;
/* Forward decl of grpc_udp_server */
typedef struct grpc_udp_server grpc_udp_server;
+/* Called when grpc server starts to listening on the grpc_fd. */
+typedef void (*grpc_udp_server_start_cb)(grpc_fd* emfd, void* user_data);
+
/* Called when data is available to read from the socket.
* Return true if there is more data to read from fd. */
-typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd, void* user_data);
+typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd);
/* Called when the socket is writeable. The given closure should be scheduled
* when the socket becomes blocked next time. */
@@ -65,6 +68,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index);
all of the multiple socket port matching logic in one place */
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
+ grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb);
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.cc b/src/core/lib/iomgr/wakeup_fd_cv.cc
index 5c1f16d3fc..c785114212 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.cc
+++ b/src/core/lib/iomgr/wakeup_fd_cv.cc
@@ -34,7 +34,7 @@
#define MAX_TABLE_RESIZE 256
-extern cv_fd_table g_cvfds;
+extern grpc_cv_fd_table g_cvfds;
static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
unsigned int i, newsize;
@@ -42,8 +42,8 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
gpr_mu_lock(&g_cvfds.mu);
if (!g_cvfds.free_fds) {
newsize = GPR_MIN(g_cvfds.size * 2, g_cvfds.size + MAX_TABLE_RESIZE);
- g_cvfds.cvfds =
- (fd_node*)gpr_realloc(g_cvfds.cvfds, sizeof(fd_node) * newsize);
+ g_cvfds.cvfds = (grpc_fd_node*)gpr_realloc(g_cvfds.cvfds,
+ sizeof(grpc_fd_node) * newsize);
for (i = g_cvfds.size; i < newsize; i++) {
g_cvfds.cvfds[i].is_set = 0;
g_cvfds.cvfds[i].cvs = nullptr;
@@ -64,7 +64,7 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
}
static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) {
- cv_node* cvn;
+ grpc_cv_node* cvn;
gpr_mu_lock(&g_cvfds.mu);
g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 1;
cvn = g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs;
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h
index 017e41bfa8..399620af76 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.h
+++ b/src/core/lib/iomgr/wakeup_fd_cv.h
@@ -40,27 +40,27 @@
#define GRPC_FD_TO_IDX(fd) (-(fd)-1)
#define GRPC_IDX_TO_FD(idx) (-(idx)-1)
-typedef struct cv_node {
+typedef struct grpc_cv_node {
gpr_cv* cv;
- struct cv_node* next;
- struct cv_node* prev;
-} cv_node;
+ struct grpc_cv_node* next;
+ struct grpc_cv_node* prev;
+} grpc_cv_node;
-typedef struct fd_node {
+typedef struct grpc_fd_node {
int is_set;
- cv_node* cvs;
- struct fd_node* next_free;
-} fd_node;
+ grpc_cv_node* cvs;
+ struct grpc_fd_node* next_free;
+} grpc_fd_node;
-typedef struct cv_fd_table {
+typedef struct grpc_cv_fd_table {
gpr_mu mu;
gpr_refcount pollcount;
gpr_cv shutdown_cv;
- fd_node* cvfds;
- fd_node* free_fds;
+ grpc_fd_node* cvfds;
+ grpc_fd_node* free_fds;
unsigned int size;
grpc_poll_function_type poll;
-} cv_fd_table;
+} grpc_cv_fd_table;
extern const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable;
diff --git a/src/core/lib/support/debug_location.h b/src/core/lib/support/debug_location.h
index 0939da595d..9b3f9220fc 100644
--- a/src/core/lib/support/debug_location.h
+++ b/src/core/lib/support/debug_location.h
@@ -36,7 +36,7 @@ class DebugLocation {
const char* file_;
const int line_;
};
-#define DEBUG_LOCATION DebugLocation(__FILE__, __LINE__)
+#define DEBUG_LOCATION ::grpc_core::DebugLocation(__FILE__, __LINE__)
#else
class DebugLocation {
public:
@@ -44,7 +44,7 @@ class DebugLocation {
const char* file() const { return nullptr; }
int line() const { return -1; }
};
-#define DEBUG_LOCATION DebugLocation()
+#define DEBUG_LOCATION ::grpc_core::DebugLocation()
#endif
} // namespace grpc_core