diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 122 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/flow_control.c | 177 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/internal.h | 32 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll1_linux.c | 9 | ||||
-rw-r--r-- | src/core/lib/security/credentials/jwt/jwt_verifier.c | 17 | ||||
-rw-r--r-- | src/core/lib/security/credentials/oauth2/oauth2_credentials.c | 8 | ||||
-rw-r--r-- | src/core/lib/surface/version.c | 2 | ||||
-rw-r--r-- | src/core/lib/transport/bdp_estimator.c | 10 | ||||
-rw-r--r-- | src/core/lib/transport/bdp_estimator.h | 10 |
9 files changed, 241 insertions, 146 deletions
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 2024e5a1a2..831301ddef 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -304,10 +304,10 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, keepalive_watchdog_fired_locked, t, grpc_combiner_scheduler(t->combiner)); - grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string); - t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC); + grpc_bdp_estimator_init(&t->flow_control.bdp_estimator, t->peer_string); + t->flow_control.last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC); grpc_pid_controller_init( - &t->pid_controller, + &t->flow_control.pid_controller, (grpc_pid_controller_args){.gain_p = 4, .gain_i = 8, .gain_d = 0, @@ -340,7 +340,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; t->sent_local_settings = 0; t->write_buffer_size = DEFAULT_WINDOW; - t->enable_bdp_probe = true; + t->flow_control.enable_bdp_probe = true; if (is_client) { grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( @@ -457,7 +457,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, (grpc_integer_options){0, 0, MAX_WRITE_BUFFER_SIZE}); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { - t->enable_bdp_probe = grpc_channel_arg_get_integer( + t->flow_control.enable_bdp_probe = grpc_channel_arg_get_integer( &channel_args->args[i], (grpc_integer_options){1, 0, 1}); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) { @@ -2262,46 +2262,27 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: break; } -} - -static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - double bdp_dbl) { - // initial window size bounded [1,2^31-1], but we set the min to 128. - int32_t bdp = GPR_CLAMP((int32_t)bdp_dbl, 128, INT32_MAX); - int64_t delta = - (int64_t)bdp - - (int64_t)t->settings[GRPC_LOCAL_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - if (delta == 0 || (delta > -bdp / 10 && delta < bdp / 10)) { - return; - } - if (GRPC_TRACER_ON(grpc_bdp_estimator_trace) || - GRPC_TRACER_ON(grpc_flowctl_trace)) { - gpr_log(GPR_DEBUG, "%s | %p[%s] | update initial window size to %d", - t->peer_string, t, t->is_client ? "cli" : "svr", (int)bdp); - } - queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, - (uint32_t)bdp); -} - -static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - double bw_dbl, double bdp_dbl) { - int32_t bdp = (int32_t)GPR_CLAMP(bdp_dbl, 128.0, INT32_MAX); - int32_t target = (int32_t)GPR_MAX(bw_dbl / 1000, bdp); - // frame size is bounded [2^14,2^24-1] - int32_t frame_size = GPR_CLAMP(target, 16384, 16777215); - int64_t delta = (int64_t)frame_size - - (int64_t)t->settings[GRPC_LOCAL_SETTINGS] - [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE]; - if (delta == 0 || (delta > -frame_size / 10 && delta < frame_size / 10)) { - return; + if (action.send_setting_update != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { + if (action.initial_window_size > 0) { + queue_setting_update(exec_ctx, t, + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + (uint32_t)action.initial_window_size); + } + if (action.max_frame_size > 0) { + queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, + (uint32_t)action.max_frame_size); + } + if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) { + grpc_chttp2_initiate_write(exec_ctx, t, "immediate setting update"); + } } - if (GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { - gpr_log(GPR_DEBUG, "%s: update max_frame size to %d", t->peer_string, - (int)frame_size); + if (action.need_ping) { + GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); + grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator); + send_ping_locked(exec_ctx, t, + GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE, + &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); } - queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, - (uint32_t)frame_size); } static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, @@ -2339,7 +2320,6 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, GPR_TIMER_BEGIN("reading_action_locked", 0); grpc_chttp2_transport *t = tp; - bool need_bdp_ping = false; GRPC_ERROR_REF(error); @@ -2358,11 +2338,9 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, GRPC_ERROR_NONE}; for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { - if (grpc_bdp_estimator_add_incoming_bytes( - &t->bdp_estimator, - (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i]))) { - need_bdp_ping = true; - } + grpc_bdp_estimator_add_incoming_bytes( + &t->flow_control.bdp_estimator, + (int64_t)GRPC_SLICE_LENGTH(t->read_buffer.slices[i])); errors[1] = grpc_chttp2_perform_read(exec_ctx, t, t->read_buffer.slices[i]); } @@ -2409,45 +2387,9 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, if (keep_reading) { grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_locked); - - if (t->enable_bdp_probe) { - if (need_bdp_ping) { - GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); - grpc_bdp_estimator_schedule_ping(&t->bdp_estimator); - send_ping_locked(exec_ctx, t, - GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE, - &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); - } - - int64_t estimate = -1; - double bdp_guess = -1; - if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) { - double target = 1 + log2((double)estimate); - double memory_pressure = grpc_resource_quota_get_memory_pressure( - grpc_resource_user_quota(grpc_endpoint_get_resource_user(t->ep))); - if (memory_pressure > 0.8) { - target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1); - } - double bdp_error = - target - grpc_pid_controller_last(&t->pid_controller); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update); - double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9; - if (dt > 0.1) { - dt = 0.1; - } - double log2_bdp_guess = - grpc_pid_controller_update(&t->pid_controller, bdp_error, dt); - bdp_guess = pow(2, log2_bdp_guess); - update_bdp(exec_ctx, t, bdp_guess); - t->last_pid_update = now; - } - - double bw = -1; - if (grpc_bdp_estimator_get_bw(&t->bdp_estimator, &bw)) { - update_frame(exec_ctx, t, bw, bdp_guess); - } - } + grpc_chttp2_act_on_flowctl_action( + exec_ctx, grpc_chttp2_flowctl_get_bdp_action(&t->flow_control), t, + NULL); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "reading_action"); @@ -2470,7 +2412,7 @@ static void start_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) { grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); } - grpc_bdp_estimator_start_ping(&t->bdp_estimator); + grpc_bdp_estimator_start_ping(&t->flow_control.bdp_estimator); } static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, @@ -2479,7 +2421,7 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, if (GRPC_TRACER_ON(grpc_http_trace)) { gpr_log(GPR_DEBUG, "%s: Complete BDP ping", t->peer_string); } - grpc_bdp_estimator_complete_ping(&t->bdp_estimator); + grpc_bdp_estimator_complete_ping(&t->flow_control.bdp_estimator); GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); } diff --git a/src/core/ext/transport/chttp2/transport/flow_control.c b/src/core/ext/transport/chttp2/transport/flow_control.c index c9f7eabd43..8dbdd1290a 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.c +++ b/src/core/ext/transport/chttp2/transport/flow_control.c @@ -18,6 +18,7 @@ #include "src/core/ext/transport/chttp2/transport/internal.h" +#include <math.h> #include <string.h> #include <grpc/support/alloc.h> @@ -39,6 +40,8 @@ typedef struct { int64_t remote_window_delta; int64_t local_window_delta; int64_t announced_window_delta; + uint32_t local_init_window; + uint32_t local_max_frame; } shadow_flow_control; static void pretrace(shadow_flow_control* shadow_fc, @@ -54,14 +57,28 @@ static void pretrace(shadow_flow_control* shadow_fc, } } -static char* fmt_str(int64_t old, int64_t new) { +#define TRACE_PADDING 30 + +static char* fmt_int64_diff_str(int64_t old, int64_t new) { char* str; if (old != new) { gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old, new); } else { gpr_asprintf(&str, "%" PRId64 "", old); } - char* str_lp = gpr_leftpad(str, ' ', 30); + char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); + gpr_free(str); + return str_lp; +} + +static char* fmt_uint32_diff_str(uint32_t old, uint32_t new) { + char* str; + if (new > 0 && old != new) { + gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old, new); + } else { + gpr_asprintf(&str, "%" PRIu32 "", old); + } + char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); gpr_free(str); return str_lp; } @@ -75,24 +92,28 @@ static void posttrace(shadow_flow_control* shadow_fc, uint32_t remote_window = tfc->t->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; - char* trw_str = fmt_str(shadow_fc->remote_window, tfc->remote_window); - char* tlw_str = fmt_str(shadow_fc->target_window, - grpc_chttp2_target_announced_window(tfc)); - char* taw_str = fmt_str(shadow_fc->announced_window, tfc->announced_window); + char* trw_str = + fmt_int64_diff_str(shadow_fc->remote_window, tfc->remote_window); + char* tlw_str = fmt_int64_diff_str(shadow_fc->target_window, + grpc_chttp2_target_announced_window(tfc)); + char* taw_str = + fmt_int64_diff_str(shadow_fc->announced_window, tfc->announced_window); char* srw_str; char* slw_str; char* saw_str; if (sfc != NULL) { - srw_str = fmt_str(shadow_fc->remote_window_delta + remote_window, - sfc->remote_window_delta + remote_window); - slw_str = fmt_str(shadow_fc->local_window_delta + acked_local_window, - sfc->local_window_delta + acked_local_window); - saw_str = fmt_str(shadow_fc->announced_window_delta + acked_local_window, - sfc->announced_window_delta + acked_local_window); + srw_str = fmt_int64_diff_str(shadow_fc->remote_window_delta + remote_window, + sfc->remote_window_delta + remote_window); + slw_str = + fmt_int64_diff_str(shadow_fc->local_window_delta + acked_local_window, + sfc->local_window_delta + acked_local_window); + saw_str = fmt_int64_diff_str( + shadow_fc->announced_window_delta + acked_local_window, + sfc->announced_window_delta + acked_local_window); } else { - srw_str = gpr_leftpad("", ' ', 30); - slw_str = gpr_leftpad("", ' ', 30); - saw_str = gpr_leftpad("", ' ', 30); + srw_str = gpr_leftpad("", ' ', TRACE_PADDING); + slw_str = gpr_leftpad("", ' ', TRACE_PADDING); + saw_str = gpr_leftpad("", ' ', TRACE_PADDING); } gpr_log(GPR_DEBUG, "%p[%u][%s] | %s | trw:%s, ttw:%s, taw:%s, srw:%s, slw:%s, saw:%s", @@ -120,10 +141,21 @@ static char* urgency_to_string(grpc_chttp2_flowctl_urgency urgency) { GPR_UNREACHABLE_CODE(return "unknown"); } -static void trace_action(grpc_chttp2_flowctl_action action) { - gpr_log(GPR_DEBUG, "transport: %s, stream: %s", +static void trace_action(grpc_chttp2_transport_flowctl* tfc, + grpc_chttp2_flowctl_action action) { + char* iw_str = fmt_uint32_diff_str( + tfc->t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + action.initial_window_size); + char* mf_str = fmt_uint32_diff_str( + tfc->t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], + action.max_frame_size); + gpr_log(GPR_DEBUG, "t[%s], s[%s], settings[%s] iw:%s mf:%s", urgency_to_string(action.send_transport_update), - urgency_to_string(action.send_stream_update)); + urgency_to_string(action.send_stream_update), + urgency_to_string(action.send_setting_update), iw_str, mf_str); + gpr_free(iw_str); + gpr_free(mf_str); } #define PRETRACE(tfc, sfc) \ @@ -131,11 +163,12 @@ static void trace_action(grpc_chttp2_flowctl_action action) { GRPC_FLOW_CONTROL_IF_TRACING(pretrace(&shadow_fc, tfc, sfc)) #define POSTTRACE(tfc, sfc, reason) \ GRPC_FLOW_CONTROL_IF_TRACING(posttrace(&shadow_fc, tfc, sfc, reason)) -#define TRACEACTION(action) GRPC_FLOW_CONTROL_IF_TRACING(trace_action(action)) +#define TRACEACTION(tfc, action) \ + GRPC_FLOW_CONTROL_IF_TRACING(trace_action(tfc, action)) #else #define PRETRACE(tfc, sfc) #define POSTTRACE(tfc, sfc, reason) -#define TRACEACTION(action) +#define TRACEACTION(tfc, action) #endif /* How many bytes of incoming flow control would we like to advertise */ @@ -342,15 +375,58 @@ void grpc_chttp2_flowctl_destroy_stream(grpc_chttp2_transport_flowctl* tfc, announced_window_delta_preupdate(tfc, sfc); } +// Returns an urgency with which to make an update +static grpc_chttp2_flowctl_urgency delta_is_significant( + const grpc_chttp2_transport_flowctl* tfc, int32_t value, + grpc_chttp2_setting_id setting_id) { + int64_t delta = (int64_t)value - + (int64_t)tfc->t->settings[GRPC_LOCAL_SETTINGS][setting_id]; + // TODO(ncteisen): tune this + if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) { + return GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE; + } else { + return GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED; + } +} + +// Takes in a target and uses the pid controller to return a stabilized +// guess at the new bdp. +static double get_pid_controller_guess(grpc_chttp2_transport_flowctl* tfc, + double target) { + double bdp_error = target - grpc_pid_controller_last(&tfc->pid_controller); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec dt_timespec = gpr_time_sub(now, tfc->last_pid_update); + double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9; + if (dt > 0.1) { + dt = 0.1; + } + double log2_bdp_guess = + grpc_pid_controller_update(&tfc->pid_controller, bdp_error, dt); + tfc->last_pid_update = now; + return pow(2, log2_bdp_guess); +} + +// Take in a target and modifies it based on the memory pressure of the system +static double get_target_under_memory_pressure( + grpc_chttp2_transport_flowctl* tfc, double target) { + // do not increase window under heavy memory pressure. + double memory_pressure = grpc_resource_quota_get_memory_pressure( + grpc_resource_user_quota(grpc_endpoint_get_resource_user(tfc->t->ep))); + if (memory_pressure > 0.8) { + target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1); + } + return target; +} + grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( - const grpc_chttp2_transport_flowctl* tfc, - const grpc_chttp2_stream_flowctl* sfc) { + grpc_chttp2_transport_flowctl* tfc, grpc_chttp2_stream_flowctl* sfc) { grpc_chttp2_flowctl_action action; memset(&action, 0, sizeof(action)); uint32_t target_announced_window = grpc_chttp2_target_announced_window(tfc); if (tfc->announced_window < target_announced_window / 2) { action.send_transport_update = GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY; } + // TODO(ncteisen): tune this if (sfc != NULL && !sfc->s->read_closed) { uint32_t sent_init_window = tfc->t->settings[GRPC_SENT_SETTINGS] @@ -364,6 +440,61 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( action.send_stream_update = GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE; } } - TRACEACTION(action); + TRACEACTION(tfc, action); + return action; +} + +grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action( + grpc_chttp2_transport_flowctl* tfc) { + grpc_chttp2_flowctl_action action; + memset(&action, 0, sizeof(action)); + if (tfc->enable_bdp_probe) { + action.need_ping = grpc_bdp_estimator_need_ping(&tfc->bdp_estimator); + + // get bdp estimate and update initial_window accordingly. + int64_t estimate = -1; + int32_t bdp = -1; + if (grpc_bdp_estimator_get_estimate(&tfc->bdp_estimator, &estimate)) { + double target = 1 + log2((double)estimate); + + // target might change based on how much memory pressure we are under + // TODO(ncteisen): experiment with setting target to be huge under low + // memory pressure. + target = get_target_under_memory_pressure(tfc, target); + + // run our target through the pid controller to stabilize change. + // TODO(ncteisen): experiment with other controllers here. + double bdp_guess = get_pid_controller_guess(tfc, target); + + // Though initial window 'could' drop to 0, we keep the floor at 128 + bdp = GPR_MAX((int32_t)bdp_guess, 128); + + grpc_chttp2_flowctl_urgency init_window_update_urgency = + delta_is_significant(tfc, bdp, + GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE); + if (init_window_update_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { + action.send_setting_update = init_window_update_urgency; + action.initial_window_size = (uint32_t)bdp; + } + } + + // get bandwidth estimate and update max_frame accordingly. + double bw_dbl = -1; + if (grpc_bdp_estimator_get_bw(&tfc->bdp_estimator, &bw_dbl)) { + // we target the max of BDP or bandwidth in microseconds. + int32_t frame_size = + GPR_CLAMP(GPR_MAX((int32_t)bw_dbl / 1000, bdp), 16384, 16777215); + grpc_chttp2_flowctl_urgency frame_size_urgency = delta_is_significant( + tfc, frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE); + if (frame_size_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { + if (frame_size_urgency > action.send_setting_update) { + action.send_setting_update = frame_size_urgency; + } + action.max_frame_size = (uint32_t)frame_size; + } + } + } + + TRACEACTION(tfc, action); return action; } diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index f26f14dbec..3c41a8958f 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -238,7 +238,17 @@ typedef struct { * to send WINDOW_UPDATE frames. */ int64_t announced_window; - // read only pointer back to transport for certain data + /** should we probe bdp? */ + bool enable_bdp_probe; + + /* bdp estimation */ + grpc_bdp_estimator bdp_estimator; + + /* pid controller */ + grpc_pid_controller pid_controller; + gpr_timespec last_pid_update; + + // pointer back to transport for tracing const grpc_chttp2_transport *t; } grpc_chttp2_transport_flowctl; @@ -261,9 +271,6 @@ struct grpc_chttp2_transport { /** is there a read request to the endpoint outstanding? */ uint8_t endpoint_reading; - /** should we probe bdp? */ - bool enable_bdp_probe; - grpc_chttp2_optimization_target opt_target; /** various lists of streams */ @@ -358,13 +365,6 @@ struct grpc_chttp2_transport { grpc_chttp2_transport_flowctl flow_control; - /* bdp estimation */ - grpc_bdp_estimator bdp_estimator; - - /* pid controller */ - grpc_pid_controller pid_controller; - gpr_timespec last_pid_update; - /* deframing */ grpc_chttp2_deframe_transport_state deframe_state; uint8_t incoming_frame_type; @@ -704,13 +704,19 @@ typedef enum { typedef struct { grpc_chttp2_flowctl_urgency send_stream_update; grpc_chttp2_flowctl_urgency send_transport_update; + grpc_chttp2_flowctl_urgency send_setting_update; + uint32_t initial_window_size; + uint32_t max_frame_size; + bool need_ping; } grpc_chttp2_flowctl_action; // Reads the flow control data and returns and actionable struct that will tell // chttp2 exactly what it needs to do grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_action( - const grpc_chttp2_transport_flowctl *tfc, - const grpc_chttp2_stream_flowctl *sfc); + grpc_chttp2_transport_flowctl *tfc, grpc_chttp2_stream_flowctl *sfc); + +grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action( + grpc_chttp2_transport_flowctl *tfc); // Takes in a flow control action and performs all the needed operations. void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 90e0ce36cd..6b034ca960 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -406,7 +406,14 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->mu); *mu = &pollset->mu; pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()]; + pollset->reassigning_neighbourhood = false; + pollset->root_worker = NULL; + pollset->kicked_without_poller = false; pollset->seen_inactive = true; + pollset->shutting_down = false; + pollset->shutdown_closure = NULL; + pollset->begin_refs = 0; + pollset->next = pollset->prev = NULL; } static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { @@ -1043,8 +1050,6 @@ static const grpc_event_engine_vtable vtable = { /* It is possible that GLIBC has epoll but the underlying kernel doesn't. * Create a dummy epoll_fd to make sure epoll support is available */ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) { - /* TODO(sreek): Temporarily disable this poller unless explicitly requested - * via GRPC_POLL_STRATEGY */ if (!explicit_request) { return NULL; } diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index 6cd558d123..a27284bc50 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -442,7 +442,7 @@ static EVP_PKEY *extract_pkey_from_x509(const char *x509_str) { end: BIO_free(bio); - if (x509 != NULL) X509_free(x509); + X509_free(x509); return result; } @@ -496,6 +496,8 @@ static EVP_PKEY *pkey_from_jwk(grpc_exec_ctx *exec_ctx, const grpc_json *json, const grpc_json *key_prop; RSA *rsa = NULL; EVP_PKEY *result = NULL; + BIGNUM *tmp_n = NULL; + BIGNUM *tmp_e = NULL; GPR_ASSERT(kty != NULL && json != NULL); if (strcmp(kty, "RSA") != 0) { @@ -507,8 +509,6 @@ static EVP_PKEY *pkey_from_jwk(grpc_exec_ctx *exec_ctx, const grpc_json *json, gpr_log(GPR_ERROR, "Could not create rsa key."); goto end; } - BIGNUM *tmp_n = NULL; - BIGNUM *tmp_e = NULL; for (key_prop = json->child; key_prop != NULL; key_prop = key_prop->next) { if (strcmp(key_prop->key, "n") == 0) { tmp_n = @@ -528,11 +528,16 @@ static EVP_PKEY *pkey_from_jwk(grpc_exec_ctx *exec_ctx, const grpc_json *json, gpr_log(GPR_ERROR, "Cannot set RSA key from inputs."); goto end; } + /* RSA_set0_key takes ownership on success. */ + tmp_n = NULL; + tmp_e = NULL; result = EVP_PKEY_new(); EVP_PKEY_set1_RSA(result, rsa); /* uprefs rsa. */ end: - if (rsa != NULL) RSA_free(rsa); + RSA_free(rsa); + BN_free(tmp_n); + BN_free(tmp_e); return result; } @@ -618,7 +623,7 @@ static int verify_jwt_signature(EVP_PKEY *key, const char *alg, result = 1; end: - if (md_ctx != NULL) EVP_MD_CTX_destroy(md_ctx); + EVP_MD_CTX_destroy(md_ctx); return result; } @@ -658,7 +663,7 @@ static void on_keys_retrieved(grpc_exec_ctx *exec_ctx, void *user_data, end: if (json != NULL) grpc_json_destroy(json); - if (verification_key != NULL) EVP_PKEY_free(verification_key); + EVP_PKEY_free(verification_key); ctx->user_cb(exec_ctx, ctx->user_data, status, claims); verifier_cb_ctx_destroy(exec_ctx, ctx); } diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c index c59e55136c..10b270c49c 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c @@ -296,10 +296,10 @@ static bool oauth2_token_fetcher_get_request_metadata( gpr_mu_unlock(&c->mu); if (start_fetch) { grpc_call_credentials_ref(creds); - c->fetch_func(exec_ctx, grpc_credentials_metadata_request_create(creds), - &c->httpcli_context, &c->pollent, - on_oauth2_token_fetcher_http_response, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), refresh_threshold)); + c->fetch_func( + exec_ctx, grpc_credentials_metadata_request_create(creds), + &c->httpcli_context, &c->pollent, on_oauth2_token_fetcher_http_response, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), refresh_threshold)); } return false; } diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c index 8cef15da80..96c16105e7 100644 --- a/src/core/lib/surface/version.c +++ b/src/core/lib/surface/version.c @@ -23,4 +23,4 @@ const char *grpc_version_string(void) { return "4.0.0-dev"; } -const char *grpc_g_stands_for(void) { return "gregarious"; } +const char *grpc_g_stands_for(void) { return "gambit"; } diff --git a/src/core/lib/transport/bdp_estimator.c b/src/core/lib/transport/bdp_estimator.c index 311ae6390d..8b57693413 100644 --- a/src/core/lib/transport/bdp_estimator.c +++ b/src/core/lib/transport/bdp_estimator.c @@ -33,20 +33,24 @@ void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) { estimator->bw_est = 0; } -bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator, +bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator, int64_t *estimate) { *estimate = estimator->estimate; return true; } -bool grpc_bdp_estimator_get_bw(grpc_bdp_estimator *estimator, double *bw) { +bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator, + double *bw) { *bw = estimator->bw_est; return true; } -bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, +void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, int64_t num_bytes) { estimator->accumulator += num_bytes; +} + +bool grpc_bdp_estimator_need_ping(const grpc_bdp_estimator *estimator) { switch (estimator->ping_state) { case GRPC_BDP_PING_UNSCHEDULED: return true; diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h index a232d1f87f..1ef0dc99dd 100644 --- a/src/core/lib/transport/bdp_estimator.h +++ b/src/core/lib/transport/bdp_estimator.h @@ -47,13 +47,15 @@ typedef struct grpc_bdp_estimator { void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name); // Returns true if a reasonable estimate could be obtained -bool grpc_bdp_estimator_get_estimate(grpc_bdp_estimator *estimator, +bool grpc_bdp_estimator_get_estimate(const grpc_bdp_estimator *estimator, int64_t *estimate); -// Returns true if a reasonable estimate could be obtained -bool grpc_bdp_estimator_get_bw(grpc_bdp_estimator *estimator, double *bw); +// Tracks new bytes read. +bool grpc_bdp_estimator_get_bw(const grpc_bdp_estimator *estimator, double *bw); // Returns true if the user should schedule a ping -bool grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, +void grpc_bdp_estimator_add_incoming_bytes(grpc_bdp_estimator *estimator, int64_t num_bytes); +// Returns true if the user should schedule a ping +bool grpc_bdp_estimator_need_ping(const grpc_bdp_estimator *estimator); // Schedule a ping: call in response to receiving a true from // grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a // transport (but not necessarily started) |