From b38bfc00bf53d6a42f9d3a15e4eb33be7e37c8b6 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 24 Oct 2016 13:21:39 -0700 Subject: Make bdp estimation memory pressure aware --- .../transport/chttp2/transport/chttp2_transport.c | 15 ++++++++-- src/core/lib/iomgr/resource_quota.c | 32 +++++++++++++++++++++- src/core/lib/iomgr/resource_quota.h | 6 ++++ 3 files changed, 49 insertions(+), 4 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 11f12941eb..b435937011 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1811,6 +1811,11 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, return error; } +static double memory_pressure_to_error(double memory_pressure) { + if (memory_pressure < 0.8) return 0; + return (1.0 - memory_pressure) * 5 * 4096; +} + static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { GPR_TIMER_BEGIN("reading_action_locked", 0); @@ -1900,12 +1905,16 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update); - double dt = dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9; + double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9; if (dt > 3) { grpc_pid_controller_reset(&t->pid_controller); } t->bdp_guess += grpc_pid_controller_update( - &t->pid_controller, 2.0 * estimate - t->bdp_guess, dt); + &t->pid_controller, + 2.0 * (double)estimate - t->bdp_guess - + memory_pressure_to_error(grpc_resource_quota_get_memory_pressure( + grpc_endpoint_get_resource_user(t->ep)->resource_quota)), + dt); update_bdp(exec_ctx, t, t->bdp_guess); if (0) gpr_log(GPR_DEBUG, "bdp guess %s: %lf (est=%" PRId64 " dt=%lf int=%lf)", @@ -2050,7 +2059,7 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, if (t->retract_incoming_window >= add_max_recv_bytes) { t->retract_incoming_window -= add_max_recv_bytes; } else { - add_max_recv_bytes -= t->retract_incoming_window; + add_max_recv_bytes -= (uint32_t)t->retract_incoming_window; t->retract_incoming_window = 0; GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("op", t, announce_incoming_window, add_max_recv_bytes); diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 5466973408..a988ed5637 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -33,6 +33,8 @@ #include "src/core/lib/iomgr/resource_quota.h" +#include +#include #include #include @@ -44,11 +46,18 @@ int grpc_resource_quota_trace = 0; +#define MEMORY_USAGE_ESTIMATION_MAX 65536 + struct grpc_resource_quota { /* refcount */ gpr_refcount refs; - /* Master combiner lock: all activity on a quota executes under this combiner + /* estimate of current memory usage + scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */ + gpr_atm memory_usage_estimation; + + /* Master combiner lock: all activity on a quota executes under this + * combiner */ grpc_combiner *combiner; /* Size of the resource quota */ @@ -181,6 +190,16 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE, false); } +/* update the atomically available resource estimate - use no barriers since + timeliness of delivery really doesn't matter much */ +static void rq_update_estimate(grpc_resource_quota *resource_quota) { + gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, + (gpr_atm)((1.0 - + ((double)resource_quota->free_pool) / + ((double)resource_quota->size)) * + MEMORY_USAGE_ESTIMATION_MAX)); +} + /* returns true if all allocations are completed */ static bool rq_alloc(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) { @@ -193,6 +212,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, int64_t amt = -resource_user->free_pool; resource_user->free_pool = 0; resource_quota->free_pool -= amt; + rq_update_estimate(resource_quota); if (grpc_resource_quota_trace) { gpr_log(GPR_DEBUG, "BP %s %s: grant alloc %" PRId64 " bytes; rq_free_pool -> %" PRId64, @@ -227,6 +247,7 @@ static bool rq_scavenge(grpc_exec_ctx *exec_ctx, int64_t amt = resource_user->free_pool; resource_user->free_pool = 0; resource_quota->free_pool += amt; + rq_update_estimate(resource_quota); if (grpc_resource_quota_trace) { gpr_log(GPR_DEBUG, "BP %s %s: scavenge %" PRId64 " bytes; rq_free_pool -> %" PRId64, @@ -411,6 +432,7 @@ static void rq_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { int64_t delta = a->size - a->resource_quota->size; a->resource_quota->size += delta; a->resource_quota->free_pool += delta; + rq_update_estimate(a->resource_quota); if (delta < 0 && a->resource_quota->free_pool < 0) { rq_step_sched(exec_ctx, a->resource_quota); } else if (delta > 0 && @@ -442,6 +464,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { resource_quota->size = INT64_MAX; resource_quota->step_scheduled = false; resource_quota->reclaiming = false; + gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0); if (name != NULL) { resource_quota->name = gpr_strdup(name); } else { @@ -482,6 +505,13 @@ void grpc_resource_quota_ref(grpc_resource_quota *resource_quota) { grpc_resource_quota_internal_ref(resource_quota); } +double grpc_resource_quota_get_memory_pressure( + grpc_resource_quota *resource_quota) { + return ((double)(gpr_atm_no_barrier_load( + &resource_quota->memory_usage_estimation))) / + ((double)MEMORY_USAGE_ESTIMATION_MAX); +} + void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, size_t size) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index af94a19911..d9e7d8fa90 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -78,6 +78,12 @@ void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx, grpc_resource_quota *grpc_resource_quota_from_channel_args( const grpc_channel_args *channel_args); +/* Return a number indicating current memory pressure: + 0.0 ==> no memory usage + 1.0 ==> maximum memory usage */ +double grpc_resource_quota_get_memory_pressure( + grpc_resource_quota *resource_quota); + /* Resource users are kept in (potentially) several intrusive linked lists at once. These are the list names. */ typedef enum { -- cgit v1.2.3