diff options
Diffstat (limited to 'src/core/ext/transport/chttp2')
10 files changed, 307 insertions, 101 deletions
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index d0a762a280..eae0145ecc 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -226,7 +226,7 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx, grpc_closure *notify) { chttp2_connector *c = (chttp2_connector *)con; grpc_resolved_address addr; - grpc_get_subchannel_address_arg(args->channel_args, &addr); + grpc_get_subchannel_address_arg(exec_ctx, args->channel_args, &addr); gpr_mu_lock(&c->mu); GPR_ASSERT(c->notify == NULL); c->notify = notify; @@ -248,8 +248,7 @@ static const grpc_connector_vtable chttp2_connector_vtable = { chttp2_connector_connect}; grpc_connector *grpc_chttp2_connector_create() { - chttp2_connector *c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); + chttp2_connector *c = gpr_zalloc(sizeof(*c)); c->base.vtable = &chttp2_connector_vtable; gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 490a0c560e..067ac35a5a 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -72,8 +72,11 @@ static grpc_channel *client_channel_factory_create_channel( grpc_arg arg; arg.type = GRPC_ARG_STRING; arg.key = GRPC_ARG_SERVER_URI; - arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); + arg.value.string = + grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target); + const char *to_remove[] = {GRPC_ARG_SERVER_URI}; + grpc_channel_args *new_args = + grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1); gpr_free(arg.value.string); grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args, GRPC_CLIENT_CHANNEL, NULL); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index d8c18eb122..f0c241d68e 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -83,7 +83,7 @@ static grpc_subchannel_args *get_secure_naming_subchannel_args( const char *server_uri_str = server_uri_arg->value.string; GPR_ASSERT(server_uri_str != NULL); grpc_uri *server_uri = - grpc_uri_parse(server_uri_str, true /* supress errors */); + grpc_uri_parse(exec_ctx, server_uri_str, true /* supress errors */); GPR_ASSERT(server_uri != NULL); const char *server_uri_path; server_uri_path = @@ -96,7 +96,7 @@ static grpc_subchannel_args *get_secure_naming_subchannel_args( const char *target_uri_str = grpc_get_subchannel_address_uri_arg(args->args); grpc_uri *target_uri = - grpc_uri_parse(target_uri_str, false /* suppress errors */); + grpc_uri_parse(exec_ctx, target_uri_str, false /* suppress errors */); GPR_ASSERT(target_uri != NULL); if (target_uri->path[0] != '\0') { // "path" may be empty const grpc_slice key = grpc_slice_from_static_string( @@ -181,8 +181,11 @@ static grpc_channel *client_channel_factory_create_channel( grpc_arg arg; arg.type = GRPC_ARG_STRING; arg.key = GRPC_ARG_SERVER_URI; - arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); + arg.value.string = + grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target); + const char *to_remove[] = {GRPC_ARG_SERVER_URI}; + grpc_channel_args *new_args = + grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1); gpr_free(arg.value.string); grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args, GRPC_CLIENT_CHANNEL, NULL); diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index 0fc180d52f..837b9fd03d 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -222,8 +222,7 @@ grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx, if (err != GRPC_ERROR_NONE) { goto error; } - state = gpr_malloc(sizeof(*state)); - memset(state, 0, sizeof(*state)); + state = gpr_zalloc(sizeof(*state)); grpc_closure_init(&state->tcp_server_shutdown_complete, tcp_server_shutdown_complete, state, grpc_schedule_on_exec_ctx); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 28a3166832..082078c72f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -48,16 +48,19 @@ #include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/http2_errors.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/status_conversion.h" #include "src/core/lib/transport/timeout_encoding.h" +#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport_impl.h" #define DEFAULT_WINDOW 65535 @@ -66,6 +69,10 @@ #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024) #define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024) +#define DEFAULT_KEEPALIVE_TIME_SECOND INT_MAX +#define DEFAULT_KEEPALIVE_TIMEOUT_SECOND 20 +#define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false + #define MAX_CLIENT_STREAM_ID 0x7fffffffu int grpc_http_trace = 0; int grpc_flowctl_trace = 0; @@ -139,6 +146,16 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, #define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 3 +/** keepalive-relevant functions */ +static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); + /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -218,8 +235,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); - memset(t, 0, sizeof(*t)); - t->base.vtable = &vtable; t->ep = ep; /* one ref is for destroy */ @@ -255,6 +270,17 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_combiner_scheduler(t->combiner, false)); grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, + t, grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->start_keepalive_ping_locked, + start_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->finish_keepalive_ping_locked, + finish_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->keepalive_watchdog_fired_locked, + keepalive_watchdog_fired_locked, t, + grpc_combiner_scheduler(t->combiner, false)); grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string); t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC); @@ -316,6 +342,18 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN), }; + /* client-side keepalive setting */ + t->keepalive_time = + DEFAULT_KEEPALIVE_TIME_SECOND == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIME_SECOND, GPR_TIMESPAN); + t->keepalive_timeout = + DEFAULT_KEEPALIVE_TIMEOUT_SECOND == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIMEOUT_SECOND, + GPR_TIMESPAN); + t->keepalive_permit_without_calls = DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; + if (channel_args) { for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, @@ -363,6 +401,28 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { t->enable_bdp_probe = grpc_channel_arg_get_integer( &channel_args->args[i], (grpc_integer_options){1, 0, 1}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_KEEPALIVE_TIME)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){DEFAULT_KEEPALIVE_TIME_SECOND, 1, INT_MAX}); + t->keepalive_time = value == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(value, GPR_TIMESPAN); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_KEEPALIVE_TIMEOUT)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){DEFAULT_KEEPALIVE_TIMEOUT_SECOND, 0, + INT_MAX}); + t->keepalive_timeout = value == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(value, GPR_TIMESPAN); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { + t->keepalive_permit_without_calls = + (uint32_t)grpc_channel_arg_get_integer( + &channel_args->args[i], (grpc_integer_options){0, 0, 1}); } else { static const struct { const char *channel_arg_name; @@ -414,6 +474,16 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; + /** Start client-side keepalive pings */ + if (t->is_client) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init( + exec_ctx, &t->keepalive_ping_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), + &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + } + grpc_chttp2_initiate_write(exec_ctx, t, false, "init"); post_benign_reclaimer(exec_ctx, t); } @@ -441,6 +511,10 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { if (!t->closed) { + if (!grpc_error_has_clear_grpc_status(error)) { + error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE); + } if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) { if (t->close_transport_on_writes_finished == NULL) { t->close_transport_on_writes_finished = @@ -450,14 +524,26 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_error_add_child(t->close_transport_on_writes_finished, error); return; } - if (!grpc_error_has_clear_grpc_status(error)) { - error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAVAILABLE); - } t->closed = 1; connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); + if (t->is_client) { + switch (t->keepalive_state) { + case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: { + grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); + break; + } + case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING: { + grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); + grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); + break; + } + case GRPC_CHTTP2_KEEPALIVE_STATE_DYING: { + break; + } + } + } /* flush writable stream list to avoid dangling references */ grpc_chttp2_stream *s; @@ -489,13 +575,11 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { GPR_TIMER_BEGIN("init_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - memset(s, 0, sizeof(*s)); - s->t = t; s->refcount = refcount; /* We reserve one 'active stream' that's dropped when the stream is @@ -504,8 +588,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_ref_init(&s->active_streams, 1); GRPC_CHTTP2_STREAM_REF(s, "chttp2"); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0]); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1]); + grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); + grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena); grpc_chttp2_data_parser_init(&s->data_parser); grpc_slice_buffer_init(&s->flow_controlled_buffer); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); @@ -581,16 +665,17 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GPR_TIMER_END("destroy_stream", 0); - gpr_free(s->destroy_stream_arg); + grpc_closure_sched(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE); } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, void *and_free_memory) { + grpc_stream *gs, + grpc_closure *then_schedule_closure) { GPR_TIMER_BEGIN("destroy_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - s->destroy_stream_arg = and_free_memory; + s->destroy_stream_arg = then_schedule_closure; grpc_closure_sched( exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s, grpc_combiner_scheduler(t->combiner, false)), @@ -1545,15 +1630,19 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, s->recv_trailing_metadata_finished != NULL) { char status_string[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(status, status_string); - grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_STATUS, - grpc_slice_from_copied_string(status_string))); + GRPC_LOG_IF_ERROR("add_status", + grpc_chttp2_incoming_metadata_buffer_replace_or_add( + exec_ctx, &s->metadata_buffer[1], + grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_GRPC_STATUS, + grpc_slice_from_copied_string(status_string)))); if (msg != NULL) { - grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, - grpc_slice_from_copied_string(msg))); + GRPC_LOG_IF_ERROR( + "add_status_message", + grpc_chttp2_incoming_metadata_buffer_replace_or_add( + exec_ctx, &s->metadata_buffer[1], + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + grpc_slice_from_copied_string(msg)))); } s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE; grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); @@ -1987,6 +2076,77 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); } +static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); + if (error == GRPC_ERROR_NONE && !(t->destroying || t->closed)) { + if (t->keepalive_permit_without_calls || t->stream_map.count > 0) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; + GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); + send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, + &t->start_keepalive_ping_locked, + &t->finish_keepalive_ping_locked); + } else { + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init( + exec_ctx, &t->keepalive_ping_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), + &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping"); +} + +static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); + grpc_timer_init( + exec_ctx, &t->keepalive_watchdog_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_timeout), + &t->keepalive_watchdog_fired_locked, gpr_now(GPR_CLOCK_MONOTONIC)); +} + +static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init( + exec_ctx, &t->keepalive_ping_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), + grpc_closure_create(init_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)), + gpr_now(GPR_CLOCK_MONOTONIC)); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end"); +} + +static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; + close_transport_locked(exec_ctx, t, + GRPC_ERROR_CREATE("keepalive watchdog timeout")); + } + } else { + /** The watchdog timer should have been cancelled by + finish_keepalive_ping_locked. */ + if (error != GRPC_ERROR_CANCELLED) { + gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", + t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive watchdog"); +} + /******************************************************************************* * CALLBACK LOOP */ @@ -2428,7 +2588,7 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), grpc_transport *grpc_create_chttp2_transport( grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, grpc_endpoint *ep, int is_client) { - grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport)); + grpc_chttp2_transport *t = gpr_zalloc(sizeof(grpc_chttp2_transport)); init_transport(exec_ctx, t, channel_args, ep, is_client != 0); return &t->base; } diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index f487533c41..9b4b1a7b84 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -91,7 +91,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_ping_parser *p = parser; while (p->byte != 8 && cur != end) { - p->opaque_8bytes |= (((uint64_t)*cur) << (8 * p->byte)); + p->opaque_8bytes |= (((uint64_t)*cur) << (56 - 8 * p->byte)); cur++; p->byte++; } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c index c91b019aa0..da0a34d32a 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c @@ -41,69 +41,48 @@ #include <grpc/support/log.h> void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer *buffer) { - buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena) { + buffer->arena = arena; + grpc_metadata_batch_init(&buffer->batch); + buffer->batch.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer) { - size_t i; - if (!buffer->published) { - for (i = 0; i < buffer->count; i++) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - } - } - gpr_free(buffer->elems); + grpc_metadata_batch_destroy(exec_ctx, &buffer->batch); } -void grpc_chttp2_incoming_metadata_buffer_add( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) { - GPR_ASSERT(!buffer->published); - if (buffer->capacity == buffer->count) { - buffer->capacity = GPR_MAX(8, 2 * buffer->capacity); - buffer->elems = - gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity); - } - buffer->elems[buffer->count++].md = elem; +grpc_error *grpc_chttp2_incoming_metadata_buffer_add( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, + grpc_mdelem elem) { buffer->size += GRPC_MDELEM_LENGTH(elem); + return grpc_metadata_batch_add_tail( + exec_ctx, &buffer->batch, + gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem); } -void grpc_chttp2_incoming_metadata_buffer_replace_or_add( +grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) { - for (size_t i = 0; i < buffer->count; i++) { - if (grpc_slice_eq(GRPC_MDKEY(buffer->elems[i].md), GRPC_MDKEY(elem))) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - buffer->elems[i].md = elem; - return; + for (grpc_linked_mdelem *l = buffer->batch.list.head; l != NULL; + l = l->next) { + if (grpc_slice_eq(GRPC_MDKEY(l->md), GRPC_MDKEY(elem))) { + GRPC_MDELEM_UNREF(exec_ctx, l->md); + l->md = elem; + return GRPC_ERROR_NONE; } } - grpc_chttp2_incoming_metadata_buffer_add(buffer, elem); + return grpc_chttp2_incoming_metadata_buffer_add(exec_ctx, buffer, elem); } void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) { - GPR_ASSERT(!buffer->published); - buffer->deadline = deadline; + buffer->batch.deadline = deadline; } void grpc_chttp2_incoming_metadata_buffer_publish( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch) { - GPR_ASSERT(!buffer->published); - buffer->published = 1; - if (buffer->count > 0) { - size_t i; - for (i = 0; i < buffer->count; i++) { - /* TODO(ctiller): do something better here */ - if (!GRPC_LOG_IF_ERROR("grpc_chttp2_incoming_metadata_buffer_publish", - grpc_metadata_batch_link_tail( - exec_ctx, batch, &buffer->elems[i]))) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - } - } - } else { - batch->list.head = batch->list.tail = NULL; - } - batch->deadline = buffer->deadline; + *batch = buffer->batch; + grpc_metadata_batch_init(&buffer->batch); } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h index 1eac6fc150..288c917e65 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h @@ -37,28 +37,26 @@ #include "src/core/lib/transport/transport.h" typedef struct { - grpc_linked_mdelem *elems; - size_t count; - size_t capacity; - gpr_timespec deadline; - int published; + gpr_arena *arena; + grpc_metadata_batch batch; size_t size; // total size of metadata } grpc_chttp2_incoming_metadata_buffer; /** assumes everything initially zeroed */ void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer *buffer); + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena); void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer); void grpc_chttp2_incoming_metadata_buffer_publish( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch); -void grpc_chttp2_incoming_metadata_buffer_add( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem); -void grpc_chttp2_incoming_metadata_buffer_replace_or_add( +grpc_error *grpc_chttp2_incoming_metadata_buffer_add( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, - grpc_mdelem elem); + grpc_mdelem elem) GRPC_MUST_USE_RESULT; +grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, + grpc_mdelem elem) GRPC_MUST_USE_RESULT; void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 5d41f4bfda..3c56c21599 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -50,6 +50,7 @@ #include "src/core/ext/transport/chttp2/transport/stream_map.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/pid_controller.h" @@ -208,6 +209,12 @@ struct grpc_chttp2_incoming_byte_stream { grpc_closure finished_action; }; +typedef enum { + GRPC_CHTTP2_KEEPALIVE_STATE_WAITING, + GRPC_CHTTP2_KEEPALIVE_STATE_PINGING, + GRPC_CHTTP2_KEEPALIVE_STATE_DYING, +} grpc_chttp2_keepalive_state; + struct grpc_chttp2_transport { grpc_transport base; /* must be first */ gpr_refcount refs; @@ -382,6 +389,28 @@ struct grpc_chttp2_transport { grpc_closure benign_reclaimer_locked; /** destructive cleanup closure */ grpc_closure destructive_reclaimer_locked; + + /* keep-alive ping support */ + /** Closure to initialize a keepalive ping */ + grpc_closure init_keepalive_ping_locked; + /** Closure to run when the keepalive ping is sent */ + grpc_closure start_keepalive_ping_locked; + /** Cousure to run when the keepalive ping ack is received */ + grpc_closure finish_keepalive_ping_locked; + /** Closrue to run when the keepalive ping timeouts */ + grpc_closure keepalive_watchdog_fired_locked; + /** timer to initiate ping events */ + grpc_timer keepalive_ping_timer; + /** watchdog to kill the transport when waiting for the keepalive ping */ + grpc_timer keepalive_watchdog_timer; + /** time duration in between pings */ + gpr_timespec keepalive_time; + /** grace period for a ping to complete before watchdog kicks in */ + gpr_timespec keepalive_timeout; + /** if keepalive pings are allowed when there's no outstanding streams */ + bool keepalive_permit_without_calls; + /** keep-alive state machine state */ + grpc_chttp2_keepalive_state keepalive_state; }; typedef enum { @@ -396,7 +425,7 @@ struct grpc_chttp2_stream { grpc_stream_refcount *refcount; grpc_closure destroy_stream; - void *destroy_stream_arg; + grpc_closure *destroy_stream_arg; grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; uint8_t included[STREAM_LIST_COUNT]; diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 7ed00522c3..7efc8c63c9 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -381,16 +381,38 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, s->incoming_window_delta + t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) { - char *msg; - gpr_asprintf(&msg, - "frame of size %d overflows incoming window of %" PRId64, - t->incoming_frame_size, - s->incoming_window_delta + - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); - grpc_error *err = GRPC_ERROR_CREATE(msg); - gpr_free(msg); - return err; + if (incoming_frame_size <= + s->incoming_window_delta + + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) { + gpr_log( + GPR_ERROR, + "Incoming frame of size %d exceeds incoming window size of %" PRId64 + ".\n" + "The (un-acked, future) window size would be %" PRId64 + " which is not exceeded.\n" + "This would usually cause a disconnection, but allowing it due to " + "broken HTTP2 implementations in the wild.\n" + "See (for example) https://github.com/netty/netty/issues/6520.", + t->incoming_frame_size, + s->incoming_window_delta + + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + s->incoming_window_delta + + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + } else { + char *msg; + gpr_asprintf(&msg, + "frame of size %d overflows incoming window of %" PRId64, + t->incoming_frame_size, + s->incoming_window_delta + + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + grpc_error *err = GRPC_ERROR_CREATE(msg); + gpr_free(msg); + return err; + } } GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s, @@ -526,7 +548,14 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, s->seen_error = true; GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md); + grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[0], md); + if (error != GRPC_ERROR_NONE) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); + s->seen_error = true; + GRPC_MDELEM_UNREF(exec_ctx, md); + } } } @@ -576,7 +605,14 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp, s->seen_error = true; GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md); + grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[1], md); + if (error != GRPC_ERROR_NONE) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); + s->seen_error = true; + GRPC_MDELEM_UNREF(exec_ctx, md); + } } GPR_TIMER_END("on_trailing_header", 0); |