diff options
author | Craig Tiller <ctiller@google.com> | 2017-10-06 08:59:22 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-10-06 08:59:22 -0700 |
commit | 527253f27de6e313e2b27893152c80a15a728f6a (patch) | |
tree | 19eb13c4dad997caf04d201c80e91b65f5be2b35 /src/core/ext | |
parent | 40928b6173453c301199e1c217836412604ef03a (diff) | |
parent | abecd39477e901ad060300a5df67e87469ddbb54 (diff) |
Merge branch 'flowctl+millis' into qps_failya
Diffstat (limited to 'src/core/ext')
24 files changed, 403 insertions, 463 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.cc b/src/core/ext/filters/client_channel/channel_connectivity.cc index a05a11dad1..31a8fc39ce 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.cc +++ b/src/core/ext/filters/client_channel/channel_connectivity.cc @@ -188,8 +188,8 @@ static void watcher_timer_init(grpc_exec_ctx *exec_ctx, void *arg, watcher_timer_init_arg *wa = (watcher_timer_init_arg *)arg; grpc_timer_init(exec_ctx, &wa->w->alarm, - gpr_convert_clock_type(wa->deadline, GPR_CLOCK_MONOTONIC), - &wa->w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timespec_to_millis_round_up(wa->deadline), + &wa->w->on_timeout); gpr_free(wa); } diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 8bff7548ac..22c2bc8880 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -71,7 +71,7 @@ typedef enum { typedef struct { gpr_refcount refs; - gpr_timespec timeout; + grpc_millis timeout; wait_for_ready_value wait_for_ready; } method_parameters; @@ -101,17 +101,18 @@ static bool parse_wait_for_ready(grpc_json *field, return true; } -static bool parse_timeout(grpc_json *field, gpr_timespec *timeout) { +static bool parse_timeout(grpc_json *field, grpc_millis *timeout) { if (field->type != GRPC_JSON_STRING) return false; size_t len = strlen(field->value); if (field->value[len - 1] != 's') return false; char *buf = gpr_strdup(field->value); buf[len - 1] = '\0'; // Remove trailing 's'. char *decimal_point = strchr(buf, '.'); + int nanos = 0; if (decimal_point != NULL) { *decimal_point = '\0'; - timeout->tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1); - if (timeout->tv_nsec == -1) { + nanos = gpr_parse_nonnegative_int(decimal_point + 1); + if (nanos == -1) { gpr_free(buf); return false; } @@ -130,24 +131,25 @@ static bool parse_timeout(grpc_json *field, gpr_timespec *timeout) { gpr_free(buf); return false; } - timeout->tv_nsec *= multiplier; + nanos *= multiplier; } - timeout->tv_sec = gpr_parse_nonnegative_int(buf); + int seconds = gpr_parse_nonnegative_int(buf); gpr_free(buf); - if (timeout->tv_sec == -1) return false; + if (seconds == -1) return false; + *timeout = seconds * GPR_MS_PER_SEC + nanos / GPR_NS_PER_MS; return true; } static void *method_parameters_create_from_json(const grpc_json *json) { wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET; - gpr_timespec timeout = {0, 0, GPR_TIMESPAN}; + grpc_millis timeout = 0; for (grpc_json *field = json->child; field != NULL; field = field->next) { if (field->key == NULL) continue; if (strcmp(field->key, "waitForReady") == 0) { if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate. if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL; } else if (strcmp(field->key, "timeout") == 0) { - if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate. + if (timeout > 0) return NULL; // Duplicate. if (!parse_timeout(field, &timeout)) return NULL; } } @@ -826,7 +828,7 @@ typedef struct client_channel_call_data { grpc_slice path; // Request path. gpr_timespec call_start_time; - gpr_timespec deadline; + grpc_millis deadline; gpr_arena *arena; grpc_call_stack *owning_call; grpc_call_combiner *call_combiner; @@ -979,11 +981,11 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, // If the deadline from the service config is shorter than the one // from the client API, reset the deadline timer. if (chand->deadline_checking_enabled && - gpr_time_cmp(calld->method_params->timeout, - gpr_time_0(GPR_TIMESPAN)) != 0) { - const gpr_timespec per_method_deadline = - gpr_time_add(calld->call_start_time, calld->method_params->timeout); - if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { + calld->method_params->timeout != 0) { + const grpc_millis per_method_deadline = + grpc_timespec_to_millis_round_up(calld->call_start_time) + + calld->method_params->timeout; + if (per_method_deadline < calld->deadline) { calld->deadline = per_method_deadline; grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); } @@ -1422,7 +1424,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, // Initialize data members. calld->path = grpc_slice_ref_internal(args->path); calld->call_start_time = args->start_time; - calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); + calld->deadline = args->deadline; calld->arena = args->arena; calld->owning_call = args->call_stack; calld->call_combiner = args->call_combiner; diff --git a/src/core/ext/filters/client_channel/connector.h b/src/core/ext/filters/client_channel/connector.h index 79ccb0d9bf..b91c93e446 100644 --- a/src/core/ext/filters/client_channel/connector.h +++ b/src/core/ext/filters/client_channel/connector.h @@ -38,7 +38,7 @@ typedef struct { /** set of pollsets interested in this connection */ grpc_pollset_set *interested_parties; /** deadline for connection */ - gpr_timespec deadline; + grpc_millis deadline; /** channel arguments (to be passed to transport) */ const grpc_channel_args *channel_args; } grpc_connect_in_args; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index d8e314d1f9..02a983c63c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -103,6 +103,7 @@ #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" +#include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/combiner.h" @@ -112,7 +113,6 @@ #include "src/core/lib/slice/slice_hash_table.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" -#include "src/core/lib/support/backoff.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" @@ -397,7 +397,7 @@ typedef struct glb_lb_policy { grpc_slice lb_call_status_details; /** LB call retry backoff state */ - gpr_backoff lb_call_backoff_state; + grpc_backoff lb_call_backoff_state; /** LB call retry timer */ grpc_timer lb_call_retry_timer; @@ -411,7 +411,7 @@ typedef struct glb_lb_policy { * recreated whenever lb_call is replaced. */ grpc_grpclb_client_stats *client_stats; /* Interval and timer for next client load report. */ - gpr_timespec client_stats_report_interval; + grpc_millis client_stats_report_interval; grpc_timer client_load_report_timer; bool client_load_report_timer_pending; bool last_client_load_report_counters_were_zero; @@ -1130,21 +1130,19 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx, /* start a timer to fall back */ if (glb_policy->lb_fallback_timeout_ms > 0 && glb_policy->serverlist == NULL && !glb_policy->fallback_timer_active) { - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec deadline = gpr_time_add( - now, - gpr_time_from_millis(glb_policy->lb_fallback_timeout_ms, GPR_TIMESPAN)); + grpc_millis deadline = + grpc_exec_ctx_now(exec_ctx) + glb_policy->lb_fallback_timeout_ms; GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer"); GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); glb_policy->fallback_timer_active = true; grpc_timer_init(exec_ctx, &glb_policy->lb_fallback_timer, deadline, - &glb_policy->lb_on_fallback, now); + &glb_policy->lb_on_fallback); } glb_policy->started_picking = true; - gpr_backoff_reset(&glb_policy->lb_call_backoff_state); + grpc_backoff_reset(&glb_policy->lb_call_backoff_state); query_for_backends_locked(exec_ctx, glb_policy); } @@ -1270,17 +1268,15 @@ static void maybe_restart_lb_call(grpc_exec_ctx *exec_ctx, glb_policy->updating_lb_call = false; } else if (!glb_policy->shutting_down) { /* if we aren't shutting down, restart the LB client call after some time */ - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec next_try = - gpr_backoff_step(&glb_policy->lb_call_backoff_state, now); + grpc_millis next_try = + grpc_backoff_step(exec_ctx, &glb_policy->lb_call_backoff_state); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...", (void *)glb_policy); - gpr_timespec timeout = gpr_time_sub(next_try, now); - if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) { - gpr_log(GPR_DEBUG, - "... retry_timer_active in %" PRId64 ".%09d seconds.", - timeout.tv_sec, timeout.tv_nsec); + grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); + if (timeout > 0) { + gpr_log(GPR_DEBUG, "... retry_timer_active in %" PRIdPTR "ms.", + timeout); } else { gpr_log(GPR_DEBUG, "... retry_timer_active immediately."); } @@ -1291,7 +1287,7 @@ static void maybe_restart_lb_call(grpc_exec_ctx *exec_ctx, grpc_combiner_scheduler(glb_policy->base.combiner)); glb_policy->retry_timer_active = true; grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, - &glb_policy->lb_on_call_retry, now); + &glb_policy->lb_on_call_retry); } GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "lb_on_server_status_received_locked"); @@ -1302,15 +1298,14 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { - const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - const gpr_timespec next_client_load_report_time = - gpr_time_add(now, glb_policy->client_stats_report_interval); + const grpc_millis next_client_load_report_time = + grpc_exec_ctx_now(exec_ctx) + glb_policy->client_stats_report_interval; GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, send_client_load_report_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer, next_client_load_report_time, - &glb_policy->client_load_report_closure, now); + &glb_policy->client_load_report_closure); } static void client_load_report_done_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -1404,12 +1399,10 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, * glb_policy->base.interested_parties, which is comprised of the polling * entities from \a client_channel. */ grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name); - gpr_timespec deadline = + grpc_millis deadline = glb_policy->lb_call_timeout_ms == 0 - ? gpr_inf_future(GPR_CLOCK_MONOTONIC) - : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_millis(glb_policy->lb_call_timeout_ms, - GPR_TIMESPAN)); + ? GRPC_MILLIS_INF_FUTURE + : grpc_exec_ctx_now(exec_ctx) + glb_policy->lb_call_timeout_ms; glb_policy->lb_call = grpc_channel_create_pollset_set_call( exec_ctx, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, @@ -1440,12 +1433,12 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, lb_on_response_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - gpr_backoff_init(&glb_policy->lb_call_backoff_state, - GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS, - GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER, - GRPC_GRPCLB_RECONNECT_JITTER, - GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_backoff_init(&glb_policy->lb_call_backoff_state, + GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS, + GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER, + GRPC_GRPCLB_RECONNECT_JITTER, + GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000, + GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); glb_policy->seen_initial_response = false; glb_policy->last_client_load_report_counters_were_zero = false; @@ -1553,7 +1546,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, memset(ops, 0, sizeof(ops)); grpc_op *op = ops; if (glb_policy->lb_response_payload != NULL) { - gpr_backoff_reset(&glb_policy->lb_call_backoff_state); + grpc_backoff_reset(&glb_policy->lb_call_backoff_state); /* Received data from the LB server. Look inside * glb_policy->lb_response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; @@ -1567,16 +1560,14 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, (response = grpc_grpclb_initial_response_parse(response_slice)) != NULL) { if (response->has_client_stats_report_interval) { - glb_policy->client_stats_report_interval = - gpr_time_max(gpr_time_from_seconds(1, GPR_TIMESPAN), - grpc_grpclb_duration_to_timespec( - &response->client_stats_report_interval)); + glb_policy->client_stats_report_interval = GPR_MAX( + GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis( + &response->client_stats_report_interval)); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "received initial LB response message; " - "client load reporting interval = %" PRId64 ".%09d sec", - glb_policy->client_stats_report_interval.tv_sec, - glb_policy->client_stats_report_interval.tv_nsec); + "client load reporting interval = %" PRIdPTR " milliseconds", + glb_policy->client_stats_report_interval); } /* take a weak ref (won't prevent calling of \a glb_shutdown() if the * strong ref count goes to zero) to be unref'd in diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc index 8ef6dfc6f4..4d5fb2081c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.cc @@ -299,13 +299,10 @@ int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, return 0; } -gpr_timespec grpc_grpclb_duration_to_timespec( - grpc_grpclb_duration *duration_pb) { - gpr_timespec duration; - duration.tv_sec = duration_pb->has_seconds ? duration_pb->seconds : 0; - duration.tv_nsec = duration_pb->has_nanos ? duration_pb->nanos : 0; - duration.clock_type = GPR_TIMESPAN; - return duration; +grpc_millis grpc_grpclb_duration_to_millis(grpc_grpclb_duration *duration_pb) { + return (grpc_millis)( + (duration_pb->has_seconds ? duration_pb->seconds : 0) * GPR_MS_PER_SEC + + (duration_pb->has_nanos ? duration_pb->nanos : 0) / GPR_NS_PER_MS); } void grpc_grpclb_initial_response_destroy( diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index c4a98492c9..56b9c096d0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -81,8 +81,7 @@ void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist); int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, const grpc_grpclb_duration *rhs); -gpr_timespec grpc_grpclb_duration_to_timespec( - grpc_grpclb_duration *duration_pb); +grpc_millis grpc_grpclb_duration_to_millis(grpc_grpclb_duration *duration_pb); /** Destroy \a initial_response */ void grpc_grpclb_initial_response_destroy( diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 69f5877b00..5f7ab987cb 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -32,13 +32,13 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/gethostname.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/json/json.h" -#include "src/core/lib/support/backoff.h" #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/service_config.h" @@ -89,7 +89,7 @@ typedef struct { bool have_retry_timer; grpc_timer retry_timer; /** retry backoff state */ - gpr_backoff backoff_state; + grpc_backoff backoff_state; /** currently resolving addresses */ grpc_lb_addresses *lb_addresses; @@ -137,7 +137,7 @@ static void dns_ares_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { ares_dns_resolver *r = (ares_dns_resolver *)resolver; if (!r->resolving) { - gpr_backoff_reset(&r->backoff_state); + grpc_backoff_reset(&r->backoff_state); dns_ares_start_resolving_locked(exec_ctx, r); } } @@ -271,22 +271,20 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { const char *msg = grpc_error_string(error); gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); - gpr_timespec timeout = gpr_time_sub(next_try, now); + grpc_millis next_try = grpc_backoff_step(exec_ctx, &r->backoff_state); + grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); GPR_ASSERT(!r->have_retry_timer); r->have_retry_timer = true; GRPC_RESOLVER_REF(&r->base, "retry-timer"); - if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) { - gpr_log(GPR_DEBUG, "retrying in %" PRId64 ".%09d seconds", timeout.tv_sec, - timeout.tv_nsec); + if (timeout > 0) { + gpr_log(GPR_DEBUG, "retrying in %" PRIdPTR " milliseconds", timeout); } else { gpr_log(GPR_DEBUG, "retrying immediately"); } grpc_timer_init(exec_ctx, &r->retry_timer, next_try, - &r->dns_ares_on_retry_timer_locked, now); + &r->dns_ares_on_retry_timer_locked); } if (r->resolved_result != NULL) { grpc_channel_args_destroy(exec_ctx, r->resolved_result); @@ -307,7 +305,7 @@ static void dns_ares_next_locked(grpc_exec_ctx *exec_ctx, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - gpr_backoff_reset(&r->backoff_state); + grpc_backoff_reset(&r->backoff_state); dns_ares_start_resolving_locked(exec_ctx, r); } else { dns_ares_maybe_finish_next_locked(exec_ctx, r); @@ -381,11 +379,11 @@ static grpc_resolver *dns_ares_create(grpc_exec_ctx *exec_ctx, grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties, args->pollset_set); } - gpr_backoff_init(&r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS, - GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, - GRPC_DNS_RECONNECT_JITTER, - GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_backoff_init(&r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS, + GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, + GRPC_DNS_RECONNECT_JITTER, + GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, + GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked, dns_ares_on_retry_timer_locked, r, grpc_combiner_scheduler(r->base.combiner)); diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 1c093d0ad2..e669b6dfc7 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -27,11 +27,11 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" +#include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" -#include "src/core/lib/support/backoff.h" #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" @@ -70,7 +70,7 @@ typedef struct { grpc_timer retry_timer; grpc_closure on_retry; /** retry backoff state */ - gpr_backoff backoff_state; + grpc_backoff backoff_state; /** currently resolving addresses */ grpc_resolved_addresses *addresses; @@ -113,7 +113,7 @@ static void dns_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; if (!r->resolving) { - gpr_backoff_reset(&r->backoff_state); + grpc_backoff_reset(&r->backoff_state); dns_start_resolving_locked(exec_ctx, r); } } @@ -126,7 +126,7 @@ static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - gpr_backoff_reset(&r->backoff_state); + grpc_backoff_reset(&r->backoff_state); dns_start_resolving_locked(exec_ctx, r); } else { dns_maybe_finish_next_locked(exec_ctx, r); @@ -153,6 +153,9 @@ static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_channel_args *result = NULL; GPR_ASSERT(r->resolving); r->resolving = false; + GRPC_ERROR_REF(error); + error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(r->name_to_resolve)); if (r->addresses != NULL) { grpc_lb_addresses *addresses = grpc_lb_addresses_create( r->addresses->naddrs, NULL /* user_data_vtable */); @@ -167,23 +170,21 @@ static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses_destroy(r->addresses); grpc_lb_addresses_destroy(exec_ctx, addresses); } else { - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); - gpr_timespec timeout = gpr_time_sub(next_try, now); + grpc_millis next_try = grpc_backoff_step(exec_ctx, &r->backoff_state); + grpc_millis timeout = next_try - grpc_exec_ctx_now(exec_ctx); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); GPR_ASSERT(!r->have_retry_timer); r->have_retry_timer = true; GRPC_RESOLVER_REF(&r->base, "retry-timer"); - if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) { - gpr_log(GPR_DEBUG, "retrying in %" PRId64 ".%09d seconds", timeout.tv_sec, - timeout.tv_nsec); + if (timeout > 0) { + gpr_log(GPR_DEBUG, "retrying in %" PRIdPTR " milliseconds", timeout); } else { gpr_log(GPR_DEBUG, "retrying immediately"); } GRPC_CLOSURE_INIT(&r->on_retry, dns_on_retry_timer_locked, r, grpc_combiner_scheduler(r->base.combiner)); - grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now); + grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry); } if (r->resolved_result != NULL) { grpc_channel_args_destroy(exec_ctx, r->resolved_result); @@ -191,6 +192,7 @@ static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, r->resolved_result = result; r->resolved_version++; dns_maybe_finish_next_locked(exec_ctx, r); + GRPC_ERROR_UNREF(error); GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving"); } @@ -254,11 +256,11 @@ static grpc_resolver *dns_create(grpc_exec_ctx *exec_ctx, grpc_pollset_set_add_pollset_set(exec_ctx, r->interested_parties, args->pollset_set); } - gpr_backoff_init(&r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS, - GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, - GRPC_DNS_RECONNECT_JITTER, - GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_backoff_init(&r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS, + GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, + GRPC_DNS_RECONNECT_JITTER, + GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, + GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); return &r->base; } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index bff5001d69..5710a22178 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -31,6 +31,7 @@ #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/ext/filters/client_channel/uri_parser.h" +#include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/debug/stats.h" @@ -38,7 +39,6 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/backoff.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/connectivity_state.h" @@ -118,9 +118,9 @@ struct grpc_subchannel { external_state_watcher root_external_state_watcher; /** next connect attempt time */ - gpr_timespec next_attempt; + grpc_millis next_attempt; /** backoff state */ - gpr_backoff backoff_state; + grpc_backoff backoff_state; /** do we have an active alarm? */ bool have_alarm; /** have we started the backoff loop */ @@ -364,7 +364,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } } } - gpr_backoff_init( + grpc_backoff_init( &c->backoff_state, initial_backoff_ms, fixed_reconnect_backoff ? 1.0 : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, @@ -428,8 +428,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - c->next_attempt = - gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); + c->next_attempt = grpc_backoff_step(exec_ctx, &c->backoff_state); continue_connect_locked(exec_ctx, c); gpr_mu_unlock(&c->mu); } else { @@ -464,24 +463,22 @@ static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx, c->connecting = true; GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); if (!c->backoff_begun) { c->backoff_begun = true; - c->next_attempt = gpr_backoff_begin(&c->backoff_state, now); + c->next_attempt = grpc_backoff_begin(exec_ctx, &c->backoff_state); continue_connect_locked(exec_ctx, c); } else { GPR_ASSERT(!c->have_alarm); c->have_alarm = true; - gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now); - if (gpr_time_cmp(time_til_next, gpr_time_0(time_til_next.clock_type)) <= - 0) { + const grpc_millis time_til_next = + c->next_attempt - grpc_exec_ctx_now(exec_ctx); + if (time_til_next <= 0) { gpr_log(GPR_INFO, "Retry immediately"); } else { - gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds", - time_til_next.tv_sec, time_til_next.tv_nsec); + gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next); } GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); - grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, &c->on_alarm, now); + grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, &c->on_alarm); } } diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index c67ff97b0b..46b29f1fe0 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -107,7 +107,7 @@ typedef struct { grpc_polling_entity *pollent; grpc_slice path; gpr_timespec start_time; - gpr_timespec deadline; + grpc_millis deadline; gpr_arena *arena; grpc_call_context_element *context; grpc_call_combiner *call_combiner; diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index 866ce46acf..dc194ec068 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -86,9 +86,8 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, // synchronized. static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - gpr_timespec deadline) { - deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) { + grpc_millis deadline) { + if (deadline == GRPC_MILLIS_INF_FUTURE) { return; } grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; @@ -114,8 +113,7 @@ static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, } GPR_ASSERT(closure != NULL); GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); - grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure, - gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure); } // Cancels the deadline timer. @@ -155,7 +153,7 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state, struct start_timer_after_init_state { bool in_call_combiner; grpc_call_element* elem; - gpr_timespec deadline; + grpc_millis deadline; grpc_closure closure; }; static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, @@ -182,14 +180,13 @@ static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_call_stack* call_stack, grpc_call_combiner* call_combiner, - gpr_timespec deadline) { + grpc_millis deadline) { grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; deadline_state->call_stack = call_stack; deadline_state->call_combiner = call_combiner; // Deadline will always be infinite on servers, so the timer will only be // set on clients with a finite deadline. - deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { + if (deadline != GRPC_MILLIS_INF_FUTURE) { // When the deadline passes, we indicate the failure by sending down // an op with cancel_error set. However, we can't send down any ops // until after the call stack is fully initialized. If we start the @@ -214,7 +211,7 @@ void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, } void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - gpr_timespec new_deadline) { + grpc_millis new_deadline) { grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; cancel_timer_if_needed(exec_ctx, deadline_state); start_timer_if_needed(exec_ctx, elem, new_deadline); diff --git a/src/core/ext/filters/deadline/deadline_filter.h b/src/core/ext/filters/deadline/deadline_filter.h index f4a1110ee6..4a80535b14 100644 --- a/src/core/ext/filters/deadline/deadline_filter.h +++ b/src/core/ext/filters/deadline/deadline_filter.h @@ -56,7 +56,8 @@ typedef struct grpc_deadline_state { void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_call_stack* call_stack, grpc_call_combiner* call_combiner, - gpr_timespec deadline); + grpc_millis deadline); + void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, grpc_call_element* elem); @@ -70,7 +71,7 @@ void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, // // Note: Must be called while holding the call combiner. void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - gpr_timespec new_deadline); + grpc_millis new_deadline); // To be called from the client-side filter's start_transport_stream_op_batch() // method. Ensures that the deadline timer is cancelled when the call diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc index f4d5b1427e..ade2e5bc82 100644 --- a/src/core/ext/filters/max_age/max_age_filter.cc +++ b/src/core/ext/filters/max_age/max_age_filter.cc @@ -56,11 +56,11 @@ typedef struct channel_data { max_connection_idle */ grpc_timer max_idle_timer; /* Allowed max time a channel may have no outstanding rpcs */ - gpr_timespec max_connection_idle; + grpc_millis max_connection_idle; /* Allowed max time a channel may exist */ - gpr_timespec max_connection_age; + grpc_millis max_connection_age; /* Allowed grace period after the channel reaches its max age */ - gpr_timespec max_connection_age_grace; + grpc_millis max_connection_age_grace; /* Closure to run when the channel's idle duration reaches max_connection_idle and should be closed gracefully */ grpc_closure close_max_idle_channel; @@ -99,10 +99,9 @@ static void increase_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) { static void decrease_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) { if (gpr_atm_full_fetch_add(&chand->call_count, -1) == 1) { GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_idle_timer"); - grpc_timer_init( - exec_ctx, &chand->max_idle_timer, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), chand->max_connection_idle), - &chand->close_max_idle_channel, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &chand->max_idle_timer, + grpc_exec_ctx_now(exec_ctx) + chand->max_connection_idle, + &chand->close_max_idle_channel); } } @@ -123,10 +122,9 @@ static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_timer_pending = true; GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer"); - grpc_timer_init( - exec_ctx, &chand->max_age_timer, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), chand->max_connection_age), - &chand->close_max_age_channel, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &chand->max_age_timer, + grpc_exec_ctx_now(exec_ctx) + chand->max_connection_age, + &chand->close_max_age_channel); gpr_mu_unlock(&chand->max_age_timer_mu); grpc_transport_op* op = grpc_make_transport_op(NULL); op->on_connectivity_state_change = &chand->channel_connectivity_changed, @@ -144,11 +142,12 @@ static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx, gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_grace_timer_pending = true; GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer"); - grpc_timer_init(exec_ctx, &chand->max_age_grace_timer, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - chand->max_connection_age_grace), - &chand->force_close_max_age_channel, - gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init( + exec_ctx, &chand->max_age_grace_timer, + chand->max_connection_age_grace == GRPC_MILLIS_INF_FUTURE + ? GRPC_MILLIS_INF_FUTURE + : grpc_exec_ctx_now(exec_ctx) + chand->max_connection_age_grace, + &chand->force_close_max_age_channel); gpr_mu_unlock(&chand->max_age_timer_mu); GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack, "max_age start_max_age_grace_timer_after_goaway_op"); @@ -249,7 +248,8 @@ static void channel_connectivity_changed(grpc_exec_ctx* exec_ctx, void* arg, connection storms. Note that the MAX_CONNECTION_AGE option without jitter would not create connection storms by itself, but if there happened to be a connection storm it could cause it to repeat at a fixed period. */ -static int add_random_max_connection_age_jitter(int value) { +static grpc_millis +add_random_max_connection_age_jitter_and_convert_to_grpc_millis(int value) { /* generate a random number between 1 - MAX_CONNECTION_AGE_JITTER and 1 + MAX_CONNECTION_AGE_JITTER */ double multiplier = rand() * MAX_CONNECTION_AGE_JITTER * 2.0 / RAND_MAX + @@ -257,7 +257,9 @@ static int add_random_max_connection_age_jitter(int value) { double result = multiplier * value; /* INT_MAX - 0.5 converts the value to float, so that result will not be cast to int implicitly before the comparison. */ - return result > INT_MAX - 0.5 ? INT_MAX : (int)result; + return result > ((double)GRPC_MILLIS_INF_FUTURE) - 0.5 + ? GRPC_MILLIS_INF_FUTURE + : (grpc_millis)result; } /* Constructor for call_data. */ @@ -287,45 +289,36 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, chand->max_age_grace_timer_pending = false; chand->channel_stack = args->channel_stack; chand->max_connection_age = - DEFAULT_MAX_CONNECTION_AGE_MS == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(add_random_max_connection_age_jitter( - DEFAULT_MAX_CONNECTION_AGE_MS), - GPR_TIMESPAN); + add_random_max_connection_age_jitter_and_convert_to_grpc_millis( + DEFAULT_MAX_CONNECTION_AGE_MS); chand->max_connection_age_grace = DEFAULT_MAX_CONNECTION_AGE_GRACE_MS == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(DEFAULT_MAX_CONNECTION_AGE_GRACE_MS, - GPR_TIMESPAN); - chand->max_connection_idle = - DEFAULT_MAX_CONNECTION_IDLE_MS == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(DEFAULT_MAX_CONNECTION_IDLE_MS, GPR_TIMESPAN); + ? GRPC_MILLIS_INF_FUTURE + : DEFAULT_MAX_CONNECTION_AGE_GRACE_MS; + chand->max_connection_idle = DEFAULT_MAX_CONNECTION_IDLE_MS == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : DEFAULT_MAX_CONNECTION_IDLE_MS; for (size_t i = 0; i < args->channel_args->num_args; ++i) { if (0 == strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_CONNECTION_AGE_MS)) { const int value = grpc_channel_arg_get_integer( &args->channel_args->args[i], MAX_CONNECTION_AGE_INTEGER_OPTIONS); chand->max_connection_age = - value == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis( - add_random_max_connection_age_jitter(value), GPR_TIMESPAN); + add_random_max_connection_age_jitter_and_convert_to_grpc_millis( + value); } else if (0 == strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS)) { const int value = grpc_channel_arg_get_integer( &args->channel_args->args[i], {DEFAULT_MAX_CONNECTION_AGE_GRACE_MS, 0, INT_MAX}); chand->max_connection_age_grace = - value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(value, GPR_TIMESPAN); + value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; } else if (0 == strcmp(args->channel_args->args[i].key, GRPC_ARG_MAX_CONNECTION_IDLE_MS)) { const int value = grpc_channel_arg_get_integer( &args->channel_args->args[i], MAX_CONNECTION_IDLE_INTEGER_OPTIONS); chand->max_connection_idle = - value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(value, GPR_TIMESPAN); + value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; } } GRPC_CLOSURE_INIT(&chand->close_max_idle_channel, close_max_idle_channel, @@ -348,8 +341,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, channel_connectivity_changed, chand, grpc_schedule_on_exec_ctx); - if (gpr_time_cmp(chand->max_connection_age, gpr_inf_future(GPR_TIMESPAN)) != - 0) { + if (chand->max_connection_age != GRPC_MILLIS_INF_FUTURE) { /* When the channel reaches its max age, we send down an op with goaway_error set. However, we can't send down any ops until after the channel stack is fully initialized. If we start the timer here, we have @@ -366,8 +358,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, /* Initialize the number of calls as 1, so that the max_idle_timer will not start until start_max_idle_timer_after_init is invoked. */ gpr_atm_rel_store(&chand->call_count, 1); - if (gpr_time_cmp(chand->max_connection_idle, gpr_inf_future(GPR_TIMESPAN)) != - 0) { + if (chand->max_connection_idle != GRPC_MILLIS_INF_FUTURE) { GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age start_max_idle_timer_after_init"); GRPC_CLOSURE_SCHED(exec_ctx, &chand->start_max_idle_timer_after_init, diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.cc b/src/core/ext/transport/chttp2/server/chttp2_server.cc index a51959bec7..7ac7f4ece8 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.cc +++ b/src/core/ext/transport/chttp2/server/chttp2_server.cc @@ -134,8 +134,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, connection_state->handshake_mgr); // TODO(roth): We should really get this timeout value from channel // args instead of hard-coding it. - const gpr_timespec deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); + const grpc_millis deadline = + grpc_exec_ctx_now(exec_ctx) + 120 * GPR_MS_PER_SEC; grpc_handshake_manager_do_handshake(exec_ctx, connection_state->handshake_mgr, tcp, state->args, deadline, acceptor, on_handshake_done, connection_state); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 65167c03bb..e4b19a2c4a 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -159,11 +159,9 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); -static void send_ping_locked( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_ping_type ping_type, grpc_closure *on_initiate, - grpc_closure *on_complete, - grpc_chttp2_initiate_write_reason initiate_write_reason); +static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_closure *on_initiate, + grpc_closure *on_complete); static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error); @@ -279,6 +277,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->is_client = is_client; t->flow_control.remote_window = DEFAULT_WINDOW; t->flow_control.announced_window = DEFAULT_WINDOW; + t->flow_control.target_initial_window_size = DEFAULT_WINDOW; t->flow_control.t = t; t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->is_first_frame = true; @@ -317,17 +316,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_combiner_scheduler(t->combiner)); grpc_bdp_estimator_init(&t->flow_control.bdp_estimator, t->peer_string); - t->flow_control.last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC); - grpc_pid_controller_init(&t->flow_control.pid_controller, - { - 4, /* gain_p */ - 8, /* gain_t */ - 0, /* gain_d */ - log2(DEFAULT_WINDOW), /* initial_control_value */ - -1, /* min_control_value */ - 25, /* max_control_value */ - 10 /* integral_range */ - }); grpc_chttp2_goaway_parser_init(&t->goaway_parser); grpc_chttp2_hpack_parser_init(exec_ctx, &t->hpack_parser); @@ -366,43 +354,33 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); } - queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, - DEFAULT_WINDOW); queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE); queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1); t->ping_policy.max_pings_without_data = g_default_max_pings_without_data; - t->ping_policy.min_sent_ping_interval_without_data = gpr_time_from_millis( - g_default_min_sent_ping_interval_without_data_ms, GPR_TIMESPAN); + t->ping_policy.min_sent_ping_interval_without_data = + g_default_min_sent_ping_interval_without_data_ms; t->ping_policy.max_ping_strikes = g_default_max_ping_strikes; - t->ping_policy.min_recv_ping_interval_without_data = gpr_time_from_millis( - g_default_min_recv_ping_interval_without_data_ms, GPR_TIMESPAN); + t->ping_policy.min_recv_ping_interval_without_data = + g_default_min_recv_ping_interval_without_data_ms; /* Keepalive setting */ if (t->is_client) { - t->keepalive_time = - g_default_client_keepalive_time_ms == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(g_default_client_keepalive_time_ms, - GPR_TIMESPAN); - t->keepalive_timeout = - g_default_client_keepalive_timeout_ms == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(g_default_client_keepalive_timeout_ms, - GPR_TIMESPAN); + t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_client_keepalive_time_ms; + t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_client_keepalive_timeout_ms; } else { - t->keepalive_time = - g_default_server_keepalive_time_ms == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(g_default_server_keepalive_time_ms, - GPR_TIMESPAN); - t->keepalive_timeout = - g_default_server_keepalive_timeout_ms == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(g_default_server_keepalive_timeout_ms, - GPR_TIMESPAN); + t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_server_keepalive_time_ms; + t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX + ? GRPC_MILLIS_INF_FUTURE + : g_default_server_keepalive_timeout_ms; } t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls; @@ -447,23 +425,21 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, channel_args->args[i].key, GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) { t->ping_policy.min_sent_ping_interval_without_data = - gpr_time_from_millis( - grpc_channel_arg_get_integer( - &channel_args->args[i], - {g_default_min_sent_ping_interval_without_data_ms, 0, - INT_MAX}), - GPR_TIMESPAN); + grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{ + g_default_min_sent_ping_interval_without_data_ms, 0, + INT_MAX}); } else if (0 == strcmp( channel_args->args[i].key, GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) { t->ping_policy.min_recv_ping_interval_without_data = - gpr_time_from_millis( - grpc_channel_arg_get_integer( - &channel_args->args[i], - {g_default_min_recv_ping_interval_without_data_ms, 0, - INT_MAX}), - GPR_TIMESPAN); + grpc_channel_arg_get_integer( + &channel_args->args[i], + grpc_integer_options{ + g_default_min_recv_ping_interval_without_data_ms, 0, + INT_MAX}); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) { t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer( @@ -476,22 +452,21 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ARG_KEEPALIVE_TIME_MS)) { const int value = grpc_channel_arg_get_integer( &channel_args->args[i], - {t->is_client ? g_default_client_keepalive_time_ms - : g_default_server_keepalive_time_ms, - 1, INT_MAX}); - t->keepalive_time = value == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(value, GPR_TIMESPAN); + grpc_integer_options{t->is_client + ? g_default_client_keepalive_time_ms + : g_default_server_keepalive_time_ms, + 1, INT_MAX}); + t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { const int value = grpc_channel_arg_get_integer( &channel_args->args[i], - {t->is_client ? g_default_client_keepalive_timeout_ms - : g_default_server_keepalive_timeout_ms, - 0, INT_MAX}); - t->keepalive_timeout = value == INT_MAX - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis(value, GPR_TIMESPAN); + grpc_integer_options{t->is_client + ? g_default_client_keepalive_timeout_ms + : g_default_server_keepalive_timeout_ms, + 0, INT_MAX}); + t->keepalive_timeout = + value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value; } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { t->keepalive_permit_without_calls = @@ -571,23 +546,27 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->ping_state.pings_before_data_required = 0; t->ping_state.is_delayed_ping_timer_set = false; - t->ping_recv_state.last_ping_recv_time = gpr_inf_past(GPR_CLOCK_MONOTONIC); + t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; t->ping_recv_state.ping_strikes = 0; /* Start keepalive pings */ - if (gpr_time_cmp(t->keepalive_time, gpr_inf_future(GPR_TIMESPAN)) != 0) { + if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init( - exec_ctx, &t->keepalive_ping_timer, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), - &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, + grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, + &t->init_keepalive_ping_locked); } else { /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no inflight keeaplive timers */ t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; } + grpc_chttp2_act_on_flowctl_action( + exec_ctx, + grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t, + NULL); + grpc_chttp2_initiate_write(exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); post_benign_reclaimer(exec_ctx, t); @@ -698,7 +677,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena); grpc_chttp2_data_parser_init(&s->data_parser); grpc_slice_buffer_init(&s->flow_controlled_buffer); - s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + s->deadline = GRPC_MILLIS_INF_FUTURE; GRPC_CLOSURE_INIT(&s->complete_fetch_locked, complete_fetch_locked, s, grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer); @@ -902,9 +881,6 @@ static void inc_initiate_write_reason( case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS: GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(exec_ctx); break; - case GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING: - GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING(exec_ctx); - break; case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING: GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING( exec_ctx); @@ -1042,6 +1018,7 @@ static void write_action_begin_locked(grpc_exec_ctx *exec_ctx, void *gt, write_action, t, scheduler), GRPC_ERROR_NONE); } else { + GRPC_STATS_INC_HTTP2_SPURIOUS_WRITES_BEGUN(exec_ctx); set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_IDLE, "begin writing nothing"); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "writing"); @@ -1140,14 +1117,12 @@ void grpc_chttp2_add_incoming_goaway(grpc_exec_ctx *exec_ctx, gpr_log(GPR_ERROR, "Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug " "data equal to \"too_many_pings\""); - double current_keepalive_time_ms = - gpr_timespec_to_micros(t->keepalive_time) / 1000; + double current_keepalive_time_ms = (double)t->keepalive_time; t->keepalive_time = current_keepalive_time_ms > INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER - ? gpr_inf_future(GPR_TIMESPAN) - : gpr_time_from_millis((int64_t)(current_keepalive_time_ms * - KEEPALIVE_TIME_BACKOFF_MULTIPLIER), - GPR_TIMESPAN); + ? GRPC_MILLIS_INF_FUTURE + : (grpc_millis)(current_keepalive_time_ms * + KEEPALIVE_TIME_BACKOFF_MULTIPLIER); } /* lie: use transient failure from the transport to indicate goaway has been @@ -1461,8 +1436,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, t->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE]; if (t->is_client) { - s->deadline = - gpr_time_min(s->deadline, s->send_initial_metadata->deadline); + s->deadline = GPR_MIN(s->deadline, s->send_initial_metadata->deadline); } if (metadata_size > metadata_peer_limit) { grpc_chttp2_cancel_stream( @@ -1646,8 +1620,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, &t->flow_control, &s->flow_control, GRPC_HEADER_SIZE_IN_BYTES, already_received); grpc_chttp2_act_on_flowctl_action( - exec_ctx, - grpc_chttp2_flowctl_get_action(&t->flow_control, &s->flow_control), + exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, + &s->flow_control), t, s); } } @@ -1680,16 +1654,14 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, if (!t->is_client) { if (op->send_initial_metadata) { - gpr_timespec deadline = + grpc_millis deadline = op->payload->send_initial_metadata.send_initial_metadata->deadline; - GPR_ASSERT(0 == - gpr_time_cmp(gpr_inf_future(deadline.clock_type), deadline)); + GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE); } if (op->send_trailing_metadata) { - gpr_timespec deadline = + grpc_millis deadline = op->payload->send_trailing_metadata.send_trailing_metadata->deadline; - GPR_ASSERT(0 == - gpr_time_cmp(gpr_inf_future(deadline.clock_type), deadline)); + GPR_ASSERT(deadline == GRPC_MILLIS_INF_FUTURE); } } @@ -1713,28 +1685,21 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { /* callback remaining pings: they're not allowed to call into the transpot, and maybe they hold resources that need to be freed */ - for (size_t i = 0; i < GRPC_CHTTP2_PING_TYPE_COUNT; i++) { - grpc_chttp2_ping_queue *pq = &t->ping_queues[i]; - for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { - grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); - GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[j]); - } + grpc_chttp2_ping_queue *pq = &t->ping_queue; + for (size_t j = 0; j < GRPC_CHTTP2_PCL_COUNT; j++) { + grpc_closure_list_fail_all(&pq->lists[j], GRPC_ERROR_REF(error)); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[j]); } GRPC_ERROR_UNREF(error); } -static void send_ping_locked( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_ping_type ping_type, grpc_closure *on_initiate, - grpc_closure *on_ack, - grpc_chttp2_initiate_write_reason initiate_write_reason) { - grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type]; +static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_closure *on_initiate, grpc_closure *on_ack) { + grpc_chttp2_ping_queue *pq = &t->ping_queue; grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate, GRPC_ERROR_NONE); - if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack, - GRPC_ERROR_NONE)) { - grpc_chttp2_initiate_write(exec_ctx, t, initiate_write_reason); - } + grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack, + GRPC_ERROR_NONE); } static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, @@ -1749,8 +1714,7 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, uint64_t id) { - grpc_chttp2_ping_queue *pq = - &t->ping_queues[id % GRPC_CHTTP2_PING_TYPE_COUNT]; + grpc_chttp2_ping_queue *pq = &t->ping_queue; if (pq->inflight_id != id) { char *from = grpc_endpoint_get_peer(t->ep); gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64, from, id); @@ -1769,8 +1733,8 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED; grpc_http2_error_code http_error; grpc_slice slice; - grpc_error_get_status(error, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL, - &slice, &http_error); + grpc_error_get_status(exec_ctx, error, GRPC_MILLIS_INF_FUTURE, NULL, &slice, + &http_error); grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)http_error, grpc_slice_ref_internal(slice), &t->qbuf); grpc_chttp2_initiate_write(exec_ctx, t, @@ -1780,7 +1744,7 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { - gpr_log(GPR_DEBUG, "PING strike"); + t->ping_recv_state.ping_strikes++; if (++t->ping_recv_state.ping_strikes > t->ping_policy.max_ping_strikes && t->ping_policy.max_ping_strikes != 0) { send_goaway(exec_ctx, t, @@ -1820,9 +1784,9 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->send_ping) { - send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, NULL, - op->send_ping, - GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); + send_ping_locked(exec_ctx, t, NULL, op->send_ping); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); } if (op->on_connectivity_state_change != NULL) { @@ -2069,7 +2033,8 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, if (!s->read_closed || !s->write_closed) { if (s->id != 0) { grpc_http2_error_code http_error; - grpc_error_get_status(due_to_error, s->deadline, NULL, NULL, &http_error); + grpc_error_get_status(exec_ctx, due_to_error, s->deadline, NULL, NULL, + &http_error); grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error, &s->stats.outgoing)); @@ -2087,7 +2052,7 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error) { grpc_status_code status; grpc_slice slice; - grpc_error_get_status(error, s->deadline, &status, &slice, NULL); + grpc_error_get_status(exec_ctx, error, s->deadline, &status, &slice, NULL); if (status != GRPC_STATUS_OK) { s->seen_error = true; @@ -2252,7 +2217,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, uint32_t len = 0; grpc_status_code grpc_status; grpc_slice slice; - grpc_error_get_status(error, s->deadline, &grpc_status, &slice, NULL); + grpc_error_get_status(exec_ctx, error, s->deadline, &grpc_status, &slice, + NULL); GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100); @@ -2469,10 +2435,8 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, if (action.need_ping) { GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator); - send_ping_locked(exec_ctx, t, - GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE, - &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked, - GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING); + send_ping_locked(exec_ctx, t, &t->start_bdp_ping_locked, + &t->finish_bdp_ping_locked); } } @@ -2580,7 +2544,8 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_locked); grpc_chttp2_act_on_flowctl_action( - exec_ctx, grpc_chttp2_flowctl_get_bdp_action(&t->flow_control), t, + exec_ctx, + grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, NULL), t, NULL); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { @@ -2613,7 +2578,7 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string); } - grpc_bdp_estimator_complete_ping(&t->flow_control.bdp_estimator); + grpc_bdp_estimator_complete_ping(exec_ctx, &t->flow_control.bdp_estimator); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); } @@ -2687,24 +2652,22 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_chttp2_stream_map_size(&t->stream_map) > 0) { t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); - send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, - &t->start_keepalive_ping_locked, - &t->finish_keepalive_ping_locked, - GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); + send_ping_locked(exec_ctx, t, &t->start_keepalive_ping_locked, + &t->finish_keepalive_ping_locked); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); } else { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init( - exec_ctx, &t->keepalive_ping_timer, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), - &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, + grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, + &t->init_keepalive_ping_locked); } } else if (error == GRPC_ERROR_CANCELLED) { /* The keepalive ping timer may be cancelled by bdp */ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init( - exec_ctx, &t->keepalive_ping_timer, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), - &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, + grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, + &t->init_keepalive_ping_locked); } GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping"); } @@ -2713,10 +2676,9 @@ static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)arg; GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); - grpc_timer_init( - exec_ctx, &t->keepalive_watchdog_timer, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_timeout), - &t->keepalive_watchdog_fired_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &t->keepalive_watchdog_timer, + grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, + &t->keepalive_watchdog_fired_locked); } static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -2727,10 +2689,9 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); - grpc_timer_init( - exec_ctx, &t->keepalive_ping_timer, - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), - &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, + grpc_exec_ctx_now(exec_ctx) + t->keepalive_time, + &t->init_keepalive_ping_locked); } } GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end"); @@ -2830,9 +2791,9 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, bs->next_action.max_size_hint, cur_length); grpc_chttp2_act_on_flowctl_action( - exec_ctx, - grpc_chttp2_flowctl_get_action(&t->flow_control, &s->flow_control), t, - s); + exec_ctx, grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, + &s->flow_control), + t, s); } GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); if (s->frame_storage.length > 0) { @@ -3180,8 +3141,6 @@ const char *grpc_chttp2_initiate_write_reason_string( return "TRANSPORT_FLOW_CONTROL"; case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS: return "SEND_SETTINGS"; - case GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING: - return "BDP_ESTIMATOR_PING"; case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING: return "FLOW_CONTROL_UNSTALLED_BY_SETTING"; case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE: diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index 75eae1f962..639e51da70 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -176,11 +176,9 @@ static void trace_action(grpc_chttp2_transport_flowctl* tfc, /* How many bytes of incoming flow control would we like to advertise */ static uint32_t grpc_chttp2_target_announced_window( const grpc_chttp2_transport_flowctl* tfc) { - return (uint32_t)GPR_MIN( - (int64_t)((1u << 31) - 1), - tfc->announced_stream_total_over_incoming_window + - tfc->t->settings[GRPC_SENT_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + return (uint32_t)GPR_MIN((int64_t)((1u << 31) - 1), + tfc->announced_stream_total_over_incoming_window + + tfc->target_initial_window_size); } // we have sent data on the wire, we must track this in our bookkeeping for the @@ -282,13 +280,14 @@ grpc_error* grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl* tfc, // Returns a non zero announce integer if we should send a transport window // update uint32_t grpc_chttp2_flowctl_maybe_send_transport_update( - grpc_chttp2_transport_flowctl* tfc) { + grpc_chttp2_transport_flowctl* tfc, bool writing_anyway) { PRETRACE(tfc, NULL); uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); uint32_t threshold_to_send_transport_window_update = tfc->t->outbuf.count > 0 ? 3 * target_announced_window / 4 : target_announced_window / 2; - if (tfc->announced_window <= threshold_to_send_transport_window_update && + if ((writing_anyway || + tfc->announced_window <= threshold_to_send_transport_window_update) && tfc->announced_window != target_announced_window) { uint32_t announce = (uint32_t)GPR_CLAMP( target_announced_window - tfc->announced_window, 0, UINT32_MAX); @@ -393,15 +392,26 @@ static grpc_chttp2_flowctl_urgency delta_is_significant( // Takes in a target and uses the pid controller to return a stabilized // guess at the new bdp. -static double get_pid_controller_guess(grpc_chttp2_transport_flowctl* tfc, +static double get_pid_controller_guess(grpc_exec_ctx* exec_ctx, + grpc_chttp2_transport_flowctl* tfc, double target) { - double bdp_error = target - grpc_pid_controller_last(&tfc->pid_controller); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec dt_timespec = gpr_time_sub(now, tfc->last_pid_update); - double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9; - if (dt > 0.1) { - dt = 0.1; + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + if (!tfc->pid_controller_initialized) { + tfc->last_pid_update = now; + tfc->pid_controller_initialized = true; + grpc_pid_controller_init( + &tfc->pid_controller, + (grpc_pid_controller_args){.gain_p = 4, + .gain_i = 8, + .gain_d = 0, + .initial_control_value = target, + .min_control_value = -1, + .max_control_value = 25, + .integral_range = 10}); + return pow(2, target); } + double bdp_error = target - grpc_pid_controller_last(&tfc->pid_controller); + double dt = (double)(now - tfc->last_pid_update) * 1e-3; double log2_bdp_guess = grpc_pid_controller_update(&tfc->pid_controller, bdp_error, dt); tfc->last_pid_update = now; @@ -414,20 +424,25 @@ static double get_target_under_memory_pressure( // do not increase window under heavy memory pressure. double memory_pressure = grpc_resource_quota_get_memory_pressure( grpc_resource_user_quota(grpc_endpoint_get_resource_user(tfc->t->ep))); - if (memory_pressure > 0.8) { - target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1); + static const double kLowMemPressure = 0.1; + static const double kZeroTarget = 22; + static const double kHighMemPressure = 0.8; + static const double kMaxMemPressure = 0.9; + if (memory_pressure < kLowMemPressure && target < kZeroTarget) { + target = (target - kZeroTarget) * memory_pressure / kLowMemPressure + + kZeroTarget; + } else if (memory_pressure > kHighMemPressure) { + target *= 1 - GPR_MIN(1, (memory_pressure - kHighMemPressure) / + (kMaxMemPressure - kHighMemPressure)); } return target; } grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( - grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) { + grpc_exec_ctx* exec_ctx, grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_stream_flowctl* sfc) { grpc_chttp2_flowctl_action action; memset(&action, 0, sizeof(action)); - uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); - if (tfc->announced_window < target_announced_window / 2) { - action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; - } // TODO(ncteisen): tune this if (sfc != NULL && !sfc->s->read_closed) { uint32_t sent_init_window = @@ -442,20 +457,12 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE; } } - TRACEACTION(tfc, action); - return action; -} - -grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action( - grpc_chttp2_transport_flowctl* tfc) { - grpc_chttp2_flowctl_action action; - memset(&action, 0, sizeof(action)); if (tfc->enable_bdp_probe) { - action.need_ping = grpc_bdp_estimator_need_ping(&tfc->bdp_estimator); + action.need_ping = + grpc_bdp_estimator_need_ping(exec_ctx, &tfc->bdp_estimator); // get bdp estimate and update initial_window accordingly. int64_t estimate = -1; - int32_t bdp = -1; if (grpc_bdp_estimator_get_estimate(&tfc->bdp_estimator, &estimate)) { double target = 1 + log2((double)estimate); @@ -466,17 +473,18 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action( // run our target through the pid controller to stabilize change. // TODO(ncteisen): experiment with other controllers here. - double bdp_guess = get_pid_controller_guess(tfc, target); + double bdp_guess = get_pid_controller_guess(exec_ctx, tfc, target); // Though initial window 'could' drop to 0, we keep the floor at 128 - bdp = GPR_MAX((int32_t)bdp_guess, 128); + tfc->target_initial_window_size = + (int32_t)GPR_CLAMP(bdp_guess, 128, INT32_MAX); grpc_chttp2_flowctl_urgency init_window_update_urgency = - delta_is_significant(tfc, bdp, + delta_is_significant(tfc, tfc->target_initial_window_size, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE); if (init_window_update_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { action.send_setting_update = init_window_update_urgency; - action.initial_window_size = (uint32_t)bdp; + action.initial_window_size = (uint32_t)tfc->target_initial_window_size; } } @@ -485,8 +493,9 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action( if (grpc_bdp_estimator_get_bw(&tfc->bdp_estimator, &bw_dbl)) { // we target the max of BDP or bandwidth in microseconds. int32_t frame_size = (int32_t)GPR_CLAMP( - GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000, bdp), 16384, - 16777215); + GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000, + tfc->target_initial_window_size), + 16384, 16777215); grpc_chttp2_flowctl_urgency frame_size_urgency = delta_is_significant( tfc, frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE); if (frame_size_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { @@ -497,7 +506,10 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action( } } } - + uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); + if (tfc->announced_window < target_announced_window / 2) { + action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; + } TRACEACTION(tfc, action); return action; } diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.cc b/src/core/ext/transport/chttp2/transport/frame_ping.cc index d431d6b2df..1cfa883ee1 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.cc +++ b/src/core/ext/transport/chttp2/transport/frame_ping.cc @@ -89,10 +89,10 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_ack_ping(exec_ctx, t, p->opaque_8bytes); } else { if (!t->is_client) { - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec next_allowed_ping = - gpr_time_add(t->ping_recv_state.last_ping_recv_time, - t->ping_policy.min_recv_ping_interval_without_data); + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + grpc_millis next_allowed_ping = + t->ping_recv_state.last_ping_recv_time + + t->ping_policy.min_recv_ping_interval_without_data; if (t->keepalive_permit_without_calls == 0 && grpc_chttp2_stream_map_size(&t->stream_map) == 0) { @@ -100,11 +100,10 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, no less than two hours. When there is no outstanding streams, we restrict the number of PINGS equivalent to TCP Keep-Alive. */ next_allowed_ping = - gpr_time_add(t->ping_recv_state.last_ping_recv_time, - gpr_time_from_seconds(7200, GPR_TIMESPAN)); + t->ping_recv_state.last_ping_recv_time + 7200 * GPR_MS_PER_SEC; } - if (gpr_time_cmp(next_allowed_ping, now) > 0) { + if (next_allowed_ping > now) { grpc_chttp2_add_ping_strike(exec_ctx, t); } diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc index 5f1a2708a5..17b8c4ab85 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.cc +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.cc @@ -535,12 +535,12 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, #define TIMEOUT_KEY "grpc-timeout" static void deadline_enc(grpc_exec_ctx *exec_ctx, - grpc_chttp2_hpack_compressor *c, gpr_timespec deadline, + grpc_chttp2_hpack_compressor *c, grpc_millis deadline, framer_state *st) { char timeout_str[GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE]; grpc_mdelem mdelem; - grpc_http2_encode_timeout( - gpr_time_sub(deadline, gpr_now(deadline.clock_type)), timeout_str); + grpc_http2_encode_timeout(deadline - grpc_exec_ctx_now(exec_ctx), + timeout_str); mdelem = grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_TIMEOUT, grpc_slice_from_copied_string(timeout_str)); hpack_enc(exec_ctx, c, mdelem, st); @@ -660,8 +660,8 @@ void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx, for (grpc_linked_mdelem *l = metadata->list.head; l; l = l->next) { hpack_enc(exec_ctx, c, l->md, &st); } - gpr_timespec deadline = metadata->deadline; - if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) != 0) { + grpc_millis deadline = metadata->deadline; + if (deadline != GRPC_MILLIS_INF_FUTURE) { deadline_enc(exec_ctx, c, deadline, &st); } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.cc b/src/core/ext/transport/chttp2/transport/incoming_metadata.cc index ba680a89db..187ce0ea87 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.cc +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.cc @@ -29,7 +29,7 @@ void grpc_chttp2_incoming_metadata_buffer_init( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena) { buffer->arena = arena; grpc_metadata_batch_init(&buffer->batch); - buffer->batch.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + buffer->batch.deadline = GRPC_MILLIS_INF_FUTURE; } void grpc_chttp2_incoming_metadata_buffer_destroy( @@ -62,7 +62,7 @@ grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( } void grpc_chttp2_incoming_metadata_buffer_set_deadline( - grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) { + grpc_chttp2_incoming_metadata_buffer *buffer, grpc_millis deadline) { buffer->batch.deadline = deadline; } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h index 9ffcabd0b9..995e8001b1 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h @@ -47,7 +47,7 @@ grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) GRPC_MUST_USE_RESULT; void grpc_chttp2_incoming_metadata_buffer_set_deadline( - grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline); + grpc_chttp2_incoming_metadata_buffer *buffer, grpc_millis deadline); #ifdef __cplusplus } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index b6afd90427..5d27bb8d67 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -66,12 +66,6 @@ typedef enum { } grpc_chttp2_write_state; typedef enum { - GRPC_CHTTP2_PING_ON_NEXT_WRITE = 0, - GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE, - GRPC_CHTTP2_PING_TYPE_COUNT /* must be last */ -} grpc_chttp2_ping_type; - -typedef enum { GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY, GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT, } grpc_chttp2_optimization_target; @@ -97,7 +91,6 @@ typedef enum { GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, - GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING, GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING, @@ -118,19 +111,19 @@ typedef struct { typedef struct { int max_pings_without_data; int max_ping_strikes; - gpr_timespec min_sent_ping_interval_without_data; - gpr_timespec min_recv_ping_interval_without_data; + grpc_millis min_sent_ping_interval_without_data; + grpc_millis min_recv_ping_interval_without_data; } grpc_chttp2_repeated_ping_policy; typedef struct { - gpr_timespec last_ping_sent_time; + grpc_millis last_ping_sent_time; int pings_before_data_required; grpc_timer delayed_ping_timer; bool is_delayed_ping_timer_set; } grpc_chttp2_repeated_ping_state; typedef struct { - gpr_timespec last_ping_recv_time; + grpc_millis last_ping_recv_time; int ping_strikes; } grpc_chttp2_server_ping_recv_state; @@ -269,6 +262,8 @@ typedef struct { * to send WINDOW_UPDATE frames. */ int64_t announced_window; + int32_t target_initial_window_size; + /** should we probe bdp? */ bool enable_bdp_probe; @@ -276,8 +271,9 @@ typedef struct { grpc_bdp_estimator bdp_estimator; /* pid controller */ + bool pid_controller_initialized; grpc_pid_controller pid_controller; - gpr_timespec last_pid_update; + grpc_millis last_pid_update; // pointer back to transport for tracing const grpc_chttp2_transport *t; @@ -374,7 +370,7 @@ struct grpc_chttp2_transport { uint32_t last_new_stream_id; /** ping queues for various ping insertion points */ - grpc_chttp2_ping_queue ping_queues[GRPC_CHTTP2_PING_TYPE_COUNT]; + grpc_chttp2_ping_queue ping_queue; grpc_chttp2_repeated_ping_policy ping_policy; grpc_chttp2_repeated_ping_state ping_state; uint64_t ping_ctr; /* unique id for pings */ @@ -459,9 +455,9 @@ struct grpc_chttp2_transport { /** watchdog to kill the transport when waiting for the keepalive ping */ grpc_timer keepalive_watchdog_timer; /** time duration in between pings */ - gpr_timespec keepalive_time; + grpc_millis keepalive_time; /** grace period for a ping to complete before watchdog kicks in */ - gpr_timespec keepalive_timeout; + grpc_millis keepalive_timeout; /** if keepalive pings are allowed when there's no outstanding streams */ bool keepalive_permit_without_calls; /** keep-alive state machine state */ @@ -570,7 +566,7 @@ struct grpc_chttp2_stream { grpc_error *byte_stream_error; /* protected by t combiner */ bool received_last_frame; /* protected by t combiner */ - gpr_timespec deadline; + grpc_millis deadline; /** saw some stream level error */ grpc_error *forced_close_error; @@ -711,7 +707,7 @@ grpc_error *grpc_chttp2_flowctl_recv_data(grpc_chttp2_transport_flowctl *tfc, // returns an announce if we should send a transport update to our peer, // else returns zero uint32_t grpc_chttp2_flowctl_maybe_send_transport_update( - grpc_chttp2_transport_flowctl *tfc); + grpc_chttp2_transport_flowctl *tfc, bool writing_anyway); // returns an announce if we should send a stream update to our peer, else // returns zero @@ -758,10 +754,8 @@ typedef struct { // Reads the flow control data and returns and actionable struct that will tell // chttp2 exactly what it needs to do grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( - grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc); - -grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action( - grpc_chttp2_transport_flowctl *tfc); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_flowctl *tfc, + grpc_chttp2_stream_flowctl *sfc); // Takes in a flow control action and performs all the needed operations. void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 3db1ad4123..78886b497a 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -359,8 +359,9 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx, s == NULL ? NULL : &s->flow_control, t->incoming_frame_size); grpc_chttp2_act_on_flowctl_action( - exec_ctx, grpc_chttp2_flowctl_get_action( - &t->flow_control, s == NULL ? NULL : &s->flow_control), + exec_ctx, + grpc_chttp2_flowctl_get_action(exec_ctx, &t->flow_control, + s == NULL ? NULL : &s->flow_control), t, s); if (err != GRPC_ERROR_NONE) { goto error_handler; @@ -385,7 +386,7 @@ error_handler: t->parser_data = &s->data_parser; t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; - t->ping_state.last_ping_sent_time = gpr_inf_past(GPR_CLOCK_MONOTONIC); + t->ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST; return GRPC_ERROR_NONE; } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, NULL)) { /* handle stream errors by closing the stream */ @@ -430,26 +431,27 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, } if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_TIMEOUT)) { - gpr_timespec *cached_timeout = - (gpr_timespec *)grpc_mdelem_get_user_data(md, free_timeout); - gpr_timespec timeout; + grpc_millis *cached_timeout = + static_cast<grpc_millis *>(grpc_mdelem_get_user_data(md, free_timeout)); + grpc_millis timeout; if (cached_timeout == NULL) { /* not already parsed: parse it now, and store the result away */ - cached_timeout = (gpr_timespec *)gpr_malloc(sizeof(gpr_timespec)); + cached_timeout = (grpc_millis *)gpr_malloc(sizeof(grpc_millis)); if (!grpc_http2_decode_timeout(GRPC_MDVALUE(md), cached_timeout)) { char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", val); gpr_free(val); - *cached_timeout = gpr_inf_future(GPR_TIMESPAN); + *cached_timeout = GRPC_MILLIS_INF_FUTURE; } timeout = *cached_timeout; grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); } else { timeout = *cached_timeout; } - grpc_chttp2_incoming_metadata_buffer_set_deadline( - &s->metadata_buffer[0], - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), timeout)); + if (timeout != GRPC_MILLIS_INF_FUTURE) { + grpc_chttp2_incoming_metadata_buffer_set_deadline( + &s->metadata_buffer[0], grpc_exec_ctx_now(exec_ctx) + timeout); + } GRPC_MDELEM_UNREF(exec_ctx, md); } else { const size_t new_size = s->metadata_buffer[0].size + GRPC_MDELEM_LENGTH(md); @@ -564,7 +566,7 @@ static grpc_error *init_header_frame_parser(grpc_exec_ctx *exec_ctx, t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; - t->ping_state.last_ping_sent_time = gpr_inf_past(GPR_CLOCK_MONOTONIC); + t->ping_state.last_ping_sent_time = GRPC_MILLIS_INF_PAST; /* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */ s = grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id); diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 25d61dcc7a..c61b70b063 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -42,18 +42,9 @@ static void finish_write_cb(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->write_cb_pool = cb; } -static void collapse_pings_from_into(grpc_chttp2_transport *t, - grpc_chttp2_ping_type ping_type, - grpc_chttp2_ping_queue *pq) { - for (size_t i = 0; i < GRPC_CHTTP2_PCL_COUNT; i++) { - grpc_closure_list_move(&t->ping_queues[ping_type].lists[i], &pq->lists[i]); - } -} - static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_ping_type ping_type) { - grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type]; + grpc_chttp2_transport *t) { + grpc_chttp2_ping_queue *pq = &t->ping_queue; if (grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { /* no ping needed: wait */ return; @@ -62,7 +53,8 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, /* ping already in-flight: wait */ if (GRPC_TRACER_ON(grpc_http_trace) || GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { - gpr_log(GPR_DEBUG, "Ping delayed [%p]: already pinging", t->peer_string); + gpr_log(GPR_DEBUG, "%s: Ping delayed [%p]: already pinging", + t->is_client ? "CLIENT" : "SERVER", t->peer_string); } return; } @@ -71,51 +63,38 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, /* need to receive something of substance before sending a ping again */ if (GRPC_TRACER_ON(grpc_http_trace) || GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { - gpr_log(GPR_DEBUG, "Ping delayed [%p]: too many recent pings: %d/%d", - t->peer_string, t->ping_state.pings_before_data_required, + gpr_log(GPR_DEBUG, "%s: Ping delayed [%p]: too many recent pings: %d/%d", + t->is_client ? "CLIENT" : "SERVER", t->peer_string, + t->ping_state.pings_before_data_required, t->ping_policy.max_pings_without_data); } return; } - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec next_allowed_ping = - gpr_time_add(t->ping_state.last_ping_sent_time, - t->ping_policy.min_sent_ping_interval_without_data); + grpc_millis now = grpc_exec_ctx_now(exec_ctx); + grpc_millis next_allowed_ping = + t->ping_state.last_ping_sent_time + + t->ping_policy.min_sent_ping_interval_without_data; if (t->keepalive_permit_without_calls == 0 && grpc_chttp2_stream_map_size(&t->stream_map) == 0) { - next_allowed_ping = gpr_time_add(t->ping_recv_state.last_ping_recv_time, - gpr_time_from_seconds(7200, GPR_TIMESPAN)); + next_allowed_ping = + t->ping_recv_state.last_ping_recv_time + 7200 * GPR_MS_PER_SEC; } - /* gpr_log(GPR_DEBUG, "next_allowed_ping:%d.%09d now:%d.%09d", - (int)next_allowed_ping.tv_sec, (int)next_allowed_ping.tv_nsec, - (int)now.tv_sec, (int)now.tv_nsec); */ - if (gpr_time_cmp(next_allowed_ping, now) > 0) { + if (next_allowed_ping > now) { /* not enough elapsed time between successive pings */ if (GRPC_TRACER_ON(grpc_http_trace) || GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { gpr_log(GPR_DEBUG, - "Ping delayed [%p]: not enough time elapsed since last ping", - t->peer_string); + "%s: Ping delayed [%p]: not enough time elapsed since last ping", + t->is_client ? "CLIENT" : "SERVER", t->peer_string); } if (!t->ping_state.is_delayed_ping_timer_set) { t->ping_state.is_delayed_ping_timer_set = true; grpc_timer_init(exec_ctx, &t->ping_state.delayed_ping_timer, - next_allowed_ping, &t->retry_initiate_ping_locked, - gpr_now(GPR_CLOCK_MONOTONIC)); + next_allowed_ping, &t->retry_initiate_ping_locked); } return; } - /* coalesce equivalent pings into this one */ - switch (ping_type) { - case GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE: - collapse_pings_from_into(t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, pq); - break; - case GRPC_CHTTP2_PING_ON_NEXT_WRITE: - break; - case GRPC_CHTTP2_PING_TYPE_COUNT: - GPR_UNREACHABLE_CODE(break); - } - pq->inflight_id = t->ping_ctr * GRPC_CHTTP2_PING_TYPE_COUNT + ping_type; + pq->inflight_id = t->ping_ctr; t->ping_ctr++; GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INITIATE]); grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT], @@ -126,7 +105,8 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, t->ping_state.last_ping_sent_time = now; if (GRPC_TRACER_ON(grpc_http_trace) || GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { - gpr_log(GPR_DEBUG, "Ping sent [%p]: %d/%d", t->peer_string, + gpr_log(GPR_DEBUG, "%s: Ping sent [%p]: %d/%d", + t->is_client ? "CLIENT" : "SERVER", t->peer_string, t->ping_state.pings_before_data_required, t->ping_policy.max_pings_without_data); } @@ -156,6 +136,25 @@ static bool update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, return sched_any; } +static void report_stall(grpc_chttp2_transport *t, grpc_chttp2_stream *s, + const char *staller) { + gpr_log( + GPR_DEBUG, + "%s:%p stream %d stalled by %s [fc:pending=%" PRIdPTR ":flowed=%" PRId64 + ":peer_initwin=%d:t_win=%" PRId64 ":s_win=%d:s_delta=%" PRId64 "]", + t->peer_string, t, s->id, staller, s->flow_controlled_buffer.length, + s->flow_controlled_bytes_flowed, + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + t->flow_control.remote_window, + (uint32_t)GPR_MAX( + 0, + s->flow_control.remote_window_delta + + (int64_t)t->settings[GRPC_PEER_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]), + s->flow_control.remote_window_delta); +} + static bool stream_ref_if_not_destroyed(gpr_refcount *r) { gpr_atm count; do { @@ -217,10 +216,9 @@ class WriteContext { void FlushWindowUpdates(grpc_exec_ctx *exec_ctx) { uint32_t transport_announce = - grpc_chttp2_flowctl_maybe_send_transport_update(&t_->flow_control); + grpc_chttp2_flowctl_maybe_send_transport_update(&t_->flow_control, + t_->outbuf.count > 0); if (transport_announce) { - maybe_initiate_ping(exec_ctx, t_, - GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE); grpc_transport_one_way_stats throwaway_stats; grpc_slice_buffer_add( &t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce, @@ -271,8 +269,7 @@ class WriteContext { void ResetPingRecvClock() { if (!t_->is_client) { - t_->ping_recv_state.last_ping_recv_time = - gpr_inf_past(GPR_CLOCK_MONOTONIC); + t_->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; t_->ping_recv_state.ping_strikes = 0; } } @@ -476,8 +473,10 @@ class StreamWriteContext { if (!data_send_context.AnyOutgoing()) { if (t_->flow_control.remote_window == 0) { + report_stall(t_, s_, "transport"); grpc_chttp2_list_add_stalled_by_transport(t_, s_); } else if (data_send_context.stream_remote_window() == 0) { + report_stall(t_, s_, "stream"); grpc_chttp2_list_add_stalled_by_stream(t_, s_); } return; // early out: nothing to do @@ -588,6 +587,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { WriteContext ctx(exec_ctx, t); ctx.FlushSettings(exec_ctx); + ctx.FlushPingAcks(); ctx.FlushQueuedBuffers(exec_ctx); ctx.EnactHpackSettings(exec_ctx); @@ -617,9 +617,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( } ctx.FlushWindowUpdates(exec_ctx); - ctx.FlushPingAcks(); - maybe_initiate_ping(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE); + maybe_initiate_ping(exec_ctx, t); GPR_TIMER_END("grpc_chttp2_begin_write", 0); diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 31739d07dd..1001d74c22 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -150,7 +150,7 @@ typedef struct inproc_stream { grpc_metadata_batch write_buffer_initial_md; bool write_buffer_initial_md_filled; uint32_t write_buffer_initial_md_flags; - gpr_timespec write_buffer_deadline; + grpc_millis write_buffer_deadline; slice_buffer_list write_buffer_message; grpc_metadata_batch write_buffer_trailing_md; bool write_buffer_trailing_md_filled; @@ -180,7 +180,7 @@ typedef struct inproc_stream { grpc_error *cancel_self_error; grpc_error *cancel_other_error; - gpr_timespec deadline; + grpc_millis deadline; bool listed; struct inproc_stream *stream_list_prev; @@ -377,8 +377,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->cancel_self_error = GRPC_ERROR_NONE; s->cancel_other_error = GRPC_ERROR_NONE; s->write_buffer_cancel_error = GRPC_ERROR_NONE; - s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); - s->write_buffer_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + s->deadline = GRPC_MILLIS_INF_FUTURE; + s->write_buffer_deadline = GRPC_MILLIS_INF_FUTURE; s->stream_list_prev = NULL; gpr_mu_lock(&t->mu->mu); @@ -421,7 +421,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, cs->write_buffer_initial_md_flags, &s->to_read_initial_md, &s->to_read_initial_md_flags, &s->to_read_initial_md_filled); - s->deadline = gpr_time_min(s->deadline, cs->write_buffer_deadline); + s->deadline = GPR_MIN(s->deadline, cs->write_buffer_deadline); grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_initial_md); cs->write_buffer_initial_md_filled = false; } @@ -956,10 +956,10 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, dest, destflags, destfilled); } if (s->t->is_client) { - gpr_timespec *dl = + grpc_millis *dl = (other == NULL) ? &s->write_buffer_deadline : &other->deadline; - *dl = gpr_time_min(*dl, op->payload->send_initial_metadata - .send_initial_metadata->deadline); + *dl = GPR_MIN(*dl, op->payload->send_initial_metadata + .send_initial_metadata->deadline); s->initial_md_sent = true; } } |