diff options
Diffstat (limited to 'src/core/ext/transport/chttp2')
7 files changed, 266 insertions, 103 deletions
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index d3e53984f2..d8c18eb122 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -40,10 +40,15 @@ #include "src/core/ext/client_channel/client_channel.h" #include "src/core/ext/client_channel/resolver_registry.h" +#include "src/core/ext/client_channel/uri_parser.h" #include "src/core/ext/transport/chttp2/client/chttp2_connector.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/security/credentials/credentials.h" +#include "src/core/lib/security/transport/lb_targets_info.h" #include "src/core/lib/security/transport/security_connector.h" +#include "src/core/lib/slice/slice_hash_table.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" @@ -53,12 +58,114 @@ static void client_channel_factory_ref( static void client_channel_factory_unref( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {} +static grpc_subchannel_args *get_secure_naming_subchannel_args( + grpc_exec_ctx *exec_ctx, const grpc_subchannel_args *args) { + grpc_channel_credentials *channel_credentials = + grpc_channel_credentials_find_in_args(args->args); + if (channel_credentials == NULL) { + gpr_log(GPR_ERROR, + "Can't create subchannel: channel credentials missing for secure " + "channel."); + return NULL; + } + // Make sure security connector does not already exist in args. + if (grpc_security_connector_find_in_args(args->args) != NULL) { + gpr_log(GPR_ERROR, + "Can't create subchannel: security connector already present in " + "channel args."); + return NULL; + } + // To which address are we connecting? By default, use the server URI. + const grpc_arg *server_uri_arg = + grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); + GPR_ASSERT(server_uri_arg != NULL); + GPR_ASSERT(server_uri_arg->type == GRPC_ARG_STRING); + const char *server_uri_str = server_uri_arg->value.string; + GPR_ASSERT(server_uri_str != NULL); + grpc_uri *server_uri = + grpc_uri_parse(server_uri_str, true /* supress errors */); + GPR_ASSERT(server_uri != NULL); + const char *server_uri_path; + server_uri_path = + server_uri->path[0] == '/' ? server_uri->path + 1 : server_uri->path; + const grpc_slice_hash_table *targets_info = + grpc_lb_targets_info_find_in_args(args->args); + char *target_name_to_check = NULL; + if (targets_info != NULL) { // LB channel + // Find the balancer name for the target. + const char *target_uri_str = + grpc_get_subchannel_address_uri_arg(args->args); + grpc_uri *target_uri = + grpc_uri_parse(target_uri_str, false /* suppress errors */); + GPR_ASSERT(target_uri != NULL); + if (target_uri->path[0] != '\0') { // "path" may be empty + const grpc_slice key = grpc_slice_from_static_string( + target_uri->path[0] == '/' ? target_uri->path + 1 : target_uri->path); + const char *value = grpc_slice_hash_table_get(targets_info, key); + if (value != NULL) target_name_to_check = gpr_strdup(value); + grpc_slice_unref_internal(exec_ctx, key); + } + if (target_name_to_check == NULL) { + // If the target name to check hasn't already been set, fall back to using + // SERVER_URI + target_name_to_check = gpr_strdup(server_uri_path); + } + grpc_uri_destroy(target_uri); + } else { // regular channel: the secure name is the original server URI. + target_name_to_check = gpr_strdup(server_uri_path); + } + grpc_uri_destroy(server_uri); + GPR_ASSERT(target_name_to_check != NULL); + grpc_channel_security_connector *subchannel_security_connector = NULL; + // Create the security connector using the credentials and target name. + grpc_channel_args *new_args_from_connector = NULL; + const grpc_security_status security_status = + grpc_channel_credentials_create_security_connector( + exec_ctx, channel_credentials, target_name_to_check, args->args, + &subchannel_security_connector, &new_args_from_connector); + if (security_status != GRPC_SECURITY_OK) { + gpr_log(GPR_ERROR, + "Failed to create secure subchannel for secure name '%s'", + target_name_to_check); + gpr_free(target_name_to_check); + return NULL; + } + gpr_free(target_name_to_check); + grpc_arg new_security_connector_arg = + grpc_security_connector_to_arg(&subchannel_security_connector->base); + + grpc_channel_args *new_args = grpc_channel_args_copy_and_add( + new_args_from_connector != NULL ? new_args_from_connector : args->args, + &new_security_connector_arg, 1); + GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, &subchannel_security_connector->base, + "lb_channel_create"); + if (new_args_from_connector != NULL) { + grpc_channel_args_destroy(exec_ctx, new_args_from_connector); + } + grpc_subchannel_args *final_sc_args = gpr_malloc(sizeof(*final_sc_args)); + memcpy(final_sc_args, args, sizeof(*args)); + final_sc_args->args = new_args; + return final_sc_args; +} + static grpc_subchannel *client_channel_factory_create_subchannel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, const grpc_subchannel_args *args) { + grpc_subchannel_args *subchannel_args = + get_secure_naming_subchannel_args(exec_ctx, args); + if (subchannel_args == NULL) { + gpr_log( + GPR_ERROR, + "Failed to create subchannel arguments during subchannel creation."); + return NULL; + } grpc_connector *connector = grpc_chttp2_connector_create(); - grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args); + grpc_subchannel *s = + grpc_subchannel_create(exec_ctx, connector, subchannel_args); grpc_connector_unref(exec_ctx, connector); + grpc_channel_args_destroy(exec_ctx, + (grpc_channel_args *)subchannel_args->args); + gpr_free(subchannel_args); return s; } @@ -91,10 +198,10 @@ static const grpc_client_channel_factory_vtable client_channel_factory_vtable = static grpc_client_channel_factory client_channel_factory = { &client_channel_factory_vtable}; -/* Create a secure client channel: - Asynchronously: - resolve target - - connect to it (trying alternatives as presented) - - perform handshakes */ +// Create a secure client channel: +// Asynchronously: - resolve target +// - connect to it (trying alternatives as presented) +// - perform handshakes grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, const char *target, const grpc_channel_args *args, @@ -103,47 +210,25 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, GRPC_API_TRACE( "grpc_secure_channel_create(creds=%p, target=%s, args=%p, " "reserved=%p)", - 4, (creds, target, args, reserved)); + 4, ((void *)creds, target, (void *)args, (void *)reserved)); GPR_ASSERT(reserved == NULL); - // Make sure security connector does not already exist in args. - if (grpc_find_security_connector_in_args(args) != NULL) { - gpr_log(GPR_ERROR, "Cannot set security context in channel args."); + grpc_channel *channel = NULL; + if (creds != NULL) { + // Add channel args containing the client channel factory and channel + // credentials. + grpc_arg args_to_add[] = { + grpc_client_channel_factory_create_channel_arg(&client_channel_factory), + grpc_channel_credentials_to_arg(creds)}; + grpc_channel_args *new_args = grpc_channel_args_copy_and_add( + args, args_to_add, GPR_ARRAY_SIZE(args_to_add)); + // Create channel. + channel = client_channel_factory_create_channel( + &exec_ctx, &client_channel_factory, target, + GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args); + // Clean up. + grpc_channel_args_destroy(&exec_ctx, new_args); grpc_exec_ctx_finish(&exec_ctx); - return grpc_lame_client_channel_create( - target, GRPC_STATUS_INTERNAL, - "Security connector exists in channel args."); - } - // Create security connector and construct new channel args. - grpc_channel_security_connector *security_connector; - grpc_channel_args *new_args_from_connector; - if (grpc_channel_credentials_create_security_connector( - &exec_ctx, creds, target, args, &security_connector, - &new_args_from_connector) != GRPC_SECURITY_OK) { - grpc_exec_ctx_finish(&exec_ctx); - return grpc_lame_client_channel_create( - target, GRPC_STATUS_INTERNAL, "Failed to create security connector."); - } - // Add channel args containing the client channel factory and security - // connector. - grpc_arg args_to_add[2]; - args_to_add[0] = - grpc_client_channel_factory_create_channel_arg(&client_channel_factory); - args_to_add[1] = grpc_security_connector_to_arg(&security_connector->base); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add( - new_args_from_connector != NULL ? new_args_from_connector : args, - args_to_add, GPR_ARRAY_SIZE(args_to_add)); - if (new_args_from_connector != NULL) { - grpc_channel_args_destroy(&exec_ctx, new_args_from_connector); } - // Create channel. - grpc_channel *channel = client_channel_factory_create_channel( - &exec_ctx, &client_channel_factory, target, - GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args); - // Clean up. - GRPC_SECURITY_CONNECTOR_UNREF(&exec_ctx, &security_connector->base, - "secure_client_channel_factory_create_channel"); - grpc_channel_args_destroy(&exec_ctx, new_args); - grpc_exec_ctx_finish(&exec_ctx); return channel != NULL ? channel : grpc_lame_client_channel_create( target, GRPC_STATUS_INTERNAL, diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index fa18f5a725..d1fab25478 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -168,7 +168,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_destroy(&t->stream_map); grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker); - grpc_combiner_destroy(exec_ctx, t->combiner); + GRPC_COMBINER_UNREF(exec_ctx, t->combiner, "chttp2_transport"); cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); @@ -265,7 +265,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, .gain_d = 0, .initial_control_value = log2(DEFAULT_WINDOW), .min_control_value = -1, - .max_control_value = 22, + .max_control_value = 25, .integral_range = 10}); grpc_chttp2_goaway_parser_init(&t->goaway_parser); @@ -292,6 +292,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; t->sent_local_settings = 0; t->write_buffer_size = DEFAULT_WINDOW; + t->enable_bdp_probe = true; if (is_client) { grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( @@ -358,6 +359,10 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer( &channel_args->args[i], (grpc_integer_options){0, 0, MAX_WRITE_BUFFER_SIZE}); + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { + t->enable_bdp_probe = grpc_channel_arg_get_integer( + &channel_args->args[i], (grpc_integer_options){1, 0, 1}); } else { static const struct { const char *channel_arg_name; @@ -564,6 +569,14 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GRPC_ERROR_UNREF(s->read_closed_error); GRPC_ERROR_UNREF(s->write_closed_error); + if (s->incoming_window_delta > 0) { + GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( + "destroy", t, s, s->incoming_window_delta); + } else if (s->incoming_window_delta < 0) { + GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA( + "destroy", t, s, -s->incoming_window_delta); + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream"); GPR_TIMER_END("destroy_stream", 0); @@ -1028,8 +1041,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GPR_TIMER_BEGIN("perform_stream_op_locked", 0); grpc_transport_stream_op *op = stream_op; - grpc_chttp2_transport *t = op->transport_private.args[0]; - grpc_chttp2_stream *s = op->transport_private.args[1]; + grpc_chttp2_transport *t = op->handler_private.args[0]; + grpc_chttp2_stream *s = op->handler_private.args[1]; if (grpc_http_trace) { char *str = grpc_transport_stream_op_string(op); @@ -1250,13 +1263,13 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_free(str); } - op->transport_private.args[0] = gt; - op->transport_private.args[1] = gs; + op->handler_private.args[0] = gt; + op->handler_private.args[1] = gs; GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); grpc_closure_sched( exec_ctx, grpc_closure_init( - &op->transport_private.closure, perform_stream_op_locked, op, + &op->handler_private.closure, perform_stream_op_locked, op, grpc_combiner_scheduler(t->combiner, op->covered_by_poller)), GRPC_ERROR_NONE); GPR_TIMER_END("perform_stream_op", 0); @@ -1796,13 +1809,13 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (delta == 0 || (bdp != 0 && delta > -1024 && delta < 1024)) { return; } + if (grpc_bdp_estimator_trace) { + gpr_log(GPR_DEBUG, "%s: update initial window size to %d", t->peer_string, + (int)bdp); + } push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp); } -/******************************************************************************* - * INPUT PROCESSING - PARSING - */ - static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_http_parser parser; @@ -1908,33 +1921,36 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->read_action_locked); - if (need_bdp_ping) { - GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); - grpc_bdp_estimator_schedule_ping(&t->bdp_estimator); - send_ping_locked(exec_ctx, t, - GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE, - &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); - } - - int64_t estimate = -1; - if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) { - double target = 1 + log2((double)estimate); - double memory_pressure = grpc_resource_quota_get_memory_pressure( - grpc_resource_user_quota(grpc_endpoint_get_resource_user(t->ep))); - if (memory_pressure > 0.8) { - target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1); + if (t->enable_bdp_probe) { + if (need_bdp_ping) { + GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping"); + grpc_bdp_estimator_schedule_ping(&t->bdp_estimator); + send_ping_locked(exec_ctx, t, + GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE, + &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); } - double bdp_error = target - grpc_pid_controller_last(&t->pid_controller); - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update); - double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9; - if (dt > 0.1) { - dt = 0.1; + + int64_t estimate = -1; + if (grpc_bdp_estimator_get_estimate(&t->bdp_estimator, &estimate)) { + double target = 1 + log2((double)estimate); + double memory_pressure = grpc_resource_quota_get_memory_pressure( + grpc_resource_user_quota(grpc_endpoint_get_resource_user(t->ep))); + if (memory_pressure > 0.8) { + target *= 1 - GPR_MIN(1, (memory_pressure - 0.8) / 0.1); + } + double bdp_error = + target - grpc_pid_controller_last(&t->pid_controller); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec dt_timespec = gpr_time_sub(now, t->last_pid_update); + double dt = (double)dt_timespec.tv_sec + dt_timespec.tv_nsec * 1e-9; + if (dt > 0.1) { + dt = 0.1; + } + double log2_bdp_guess = + grpc_pid_controller_update(&t->pid_controller, bdp_error, dt); + update_bdp(exec_ctx, t, pow(2, log2_bdp_guess)); + t->last_pid_update = now; } - double log2_bdp_guess = - grpc_pid_controller_update(&t->pid_controller, bdp_error, dt); - update_bdp(exec_ctx, t, pow(2, log2_bdp_guess)); - t->last_pid_update = now; } GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keep_reading"); } else { @@ -2046,8 +2062,8 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, (int64_t)have_already) { write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED; } - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window_delta, - add_max_recv_bytes); + GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA("op", t, s, + add_max_recv_bytes); GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window, add_max_recv_bytes); if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size - diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c index 7d5beed09d..cb017e75f0 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c @@ -109,8 +109,13 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse(grpc_exec_ctx *exec_ctx, (((uint32_t)p->reason_bytes[3])); grpc_error *error = GRPC_ERROR_NONE; if (reason != GRPC_HTTP2_NO_ERROR || s->header_frames_received < 2) { - error = grpc_error_set_int(GRPC_ERROR_CREATE("RST_STREAM"), - GRPC_ERROR_INT_HTTP2_ERROR, (intptr_t)reason); + char *message; + gpr_asprintf(&message, "Received RST_STREAM with error code %d", reason); + error = grpc_error_set_int( + grpc_error_set_str(GRPC_ERROR_CREATE("RST_STREAM"), + GRPC_ERROR_STR_GRPC_MESSAGE, message), + GRPC_ERROR_INT_HTTP2_ERROR, (intptr_t)reason); + gpr_free(message); } grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, error); } diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.h b/src/core/ext/transport/chttp2/transport/frame_settings.h index a29dc82106..44137798c0 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.h +++ b/src/core/ext/transport/chttp2/transport/frame_settings.h @@ -87,7 +87,7 @@ extern const grpc_chttp2_setting_parameters grpc_chttp2_settings_parameters[GRPC_CHTTP2_NUM_SETTINGS]; /* Create a settings frame by diffing old & new, and updating old to be new */ -grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, +grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *newval, uint32_t force_mask, size_t count); /* Create an ack settings frame */ grpc_slice grpc_chttp2_settings_ack_create(void); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 1dabf9edba..5d41f4bfda 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -227,6 +227,9 @@ struct grpc_chttp2_transport { /** is there a read request to the endpoint outstanding? */ uint8_t endpoint_reading; + /** should we probe bdp? */ + bool enable_bdp_probe; + /** various lists of streams */ grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; @@ -268,10 +271,6 @@ struct grpc_chttp2_transport { /** data to write next write */ grpc_slice_buffer qbuf; - /** window available to announce to peer */ - int64_t announce_incoming_window; - /** how much window would we like to have for incoming_window */ - uint32_t connection_window_target; /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set? */ uint32_t write_buffer_size; @@ -325,6 +324,16 @@ struct grpc_chttp2_transport { /** window available for peer to send to us */ int64_t incoming_window; + /** calculating what we should give for incoming window: + we track the total amount of flow control over initial window size + across all streams: this is data that we want to receive right now (it + has an outstanding read) + and the total amount of flow control under initial window size across all + streams: this is data we've read early + we want to adjust incoming_window such that: + incoming_window = total_over - max(bdp - total_under, 0) */ + int64_t stream_total_over_incoming_window; + int64_t stream_total_under_incoming_window; /* deframing */ grpc_chttp2_deframe_transport_state deframe_state; @@ -631,6 +640,44 @@ typedef enum { GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \ amount) +#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE( \ + phase, transport, dst_context) \ + if (dst_context->incoming_window_delta < 0) { \ + transport->stream_total_under_incoming_window += \ + dst_context->incoming_window_delta; \ + } else if (dst_context->incoming_window_delta > 0) { \ + transport->stream_total_over_incoming_window -= \ + dst_context->incoming_window_delta; \ + } + +#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE( \ + phase, transport, dst_context) \ + if (dst_context->incoming_window_delta < 0) { \ + transport->stream_total_under_incoming_window -= \ + dst_context->incoming_window_delta; \ + } else if (dst_context->incoming_window_delta > 0) { \ + transport->stream_total_over_incoming_window += \ + dst_context->incoming_window_delta; \ + } + +#define GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( \ + phase, transport, dst_context, amount) \ + GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \ + dst_context); \ + GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, \ + incoming_window_delta, amount); \ + GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \ + dst_context); + +#define GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA( \ + phase, transport, dst_context, amount) \ + GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \ + dst_context); \ + GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, \ + incoming_window_delta, amount); \ + GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \ + dst_context); + #define GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, id, dst_context, \ dst_var, amount) \ do { \ @@ -749,4 +796,6 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error); +uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t); + #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 24bd93067b..7ed00522c3 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -376,15 +376,6 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, return err; } - uint32_t target_incoming_window = GPR_MAX( - t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - 1024); - GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window, - incoming_frame_size); - if (t->incoming_window <= target_incoming_window / 2) { - grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control"); - } - if (s != NULL) { if (incoming_frame_size > s->incoming_window_delta + @@ -402,8 +393,8 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, return err; } - GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window_delta, - incoming_frame_size); + GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s, + incoming_frame_size); if ((int64_t)t->settings[GRPC_SENT_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] + (int64_t)s->incoming_window_delta - (int64_t)s->announce_window <= @@ -417,6 +408,13 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, s->received_bytes += incoming_frame_size; } + uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t); + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window, + incoming_frame_size); + if (t->incoming_window <= target_incoming_window / 2) { + grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control"); + } + return GRPC_ERROR_NONE; } diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 05e6f59947..2b9d93cae7 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -152,6 +152,17 @@ static bool stream_ref_if_not_destroyed(gpr_refcount *r) { return true; } +uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t) { + return (uint32_t)GPR_MAX( + (int64_t)((1u << 31) - 1), + t->stream_total_over_incoming_window + + (int64_t)GPR_MAX( + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - + t->stream_total_under_incoming_window, + 0)); +} + bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_chttp2_stream *s; @@ -310,13 +321,12 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, /* if the grpc_chttp2_transport is ready to send a window update, do so here also; 3/4 is a magic number that will likely get tuned soon */ - uint32_t target_incoming_window = GPR_MAX( - t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - 1024); + uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t); uint32_t threshold_to_send_transport_window_update = t->outbuf.count > 0 ? 3 * target_incoming_window / 4 : target_incoming_window / 2; - if (t->incoming_window <= threshold_to_send_transport_window_update) { + if (t->incoming_window <= threshold_to_send_transport_window_update && + t->incoming_window != target_incoming_window) { maybe_initiate_ping(exec_ctx, t, GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE); uint32_t announced = (uint32_t)GPR_CLAMP( |