diff options
author | Mark D. Roth <roth@google.com> | 2017-03-17 11:04:09 -0700 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2017-03-17 11:04:09 -0700 |
commit | f3792b2024961430878ed770db07bf94526729c6 (patch) | |
tree | 4092d6575a49d9b0f6daf86adab47b2c2c70ee54 /src/core/ext/transport | |
parent | 1dcd922ce65a00892e0815262b8446af06ad882b (diff) | |
parent | 11c57e89e5b6f869dab744aeb2b0a7dc5be4ca5e (diff) |
Merge remote-tracking branch 'upstream/master' into remove_initial_connect_string
Diffstat (limited to 'src/core/ext/transport')
17 files changed, 758 insertions, 323 deletions
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index e7d6801a44..d49c32b671 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -191,7 +191,7 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx, grpc_closure *notify) { chttp2_connector *c = (chttp2_connector *)con; grpc_resolved_address addr; - grpc_get_subchannel_address_arg(args->channel_args, &addr); + grpc_get_subchannel_address_arg(exec_ctx, args->channel_args, &addr); gpr_mu_lock(&c->mu); GPR_ASSERT(c->notify == NULL); c->notify = notify; @@ -213,8 +213,7 @@ static const grpc_connector_vtable chttp2_connector_vtable = { chttp2_connector_connect}; grpc_connector *grpc_chttp2_connector_create() { - chttp2_connector *c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); + chttp2_connector *c = gpr_zalloc(sizeof(*c)); c->base.vtable = &chttp2_connector_vtable; gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 490a0c560e..067ac35a5a 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -72,8 +72,11 @@ static grpc_channel *client_channel_factory_create_channel( grpc_arg arg; arg.type = GRPC_ARG_STRING; arg.key = GRPC_ARG_SERVER_URI; - arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); + arg.value.string = + grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target); + const char *to_remove[] = {GRPC_ARG_SERVER_URI}; + grpc_channel_args *new_args = + grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1); gpr_free(arg.value.string); grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args, GRPC_CLIENT_CHANNEL, NULL); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index d8c18eb122..f0c241d68e 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -83,7 +83,7 @@ static grpc_subchannel_args *get_secure_naming_subchannel_args( const char *server_uri_str = server_uri_arg->value.string; GPR_ASSERT(server_uri_str != NULL); grpc_uri *server_uri = - grpc_uri_parse(server_uri_str, true /* supress errors */); + grpc_uri_parse(exec_ctx, server_uri_str, true /* supress errors */); GPR_ASSERT(server_uri != NULL); const char *server_uri_path; server_uri_path = @@ -96,7 +96,7 @@ static grpc_subchannel_args *get_secure_naming_subchannel_args( const char *target_uri_str = grpc_get_subchannel_address_uri_arg(args->args); grpc_uri *target_uri = - grpc_uri_parse(target_uri_str, false /* suppress errors */); + grpc_uri_parse(exec_ctx, target_uri_str, false /* suppress errors */); GPR_ASSERT(target_uri != NULL); if (target_uri->path[0] != '\0') { // "path" may be empty const grpc_slice key = grpc_slice_from_static_string( @@ -181,8 +181,11 @@ static grpc_channel *client_channel_factory_create_channel( grpc_arg arg; arg.type = GRPC_ARG_STRING; arg.key = GRPC_ARG_SERVER_URI; - arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); + arg.value.string = + grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target); + const char *to_remove[] = {GRPC_ARG_SERVER_URI}; + grpc_channel_args *new_args = + grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1); gpr_free(arg.value.string); grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args, GRPC_CLIENT_CHANNEL, NULL); diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index ae2c3838ed..837b9fd03d 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -55,11 +55,6 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -typedef struct pending_handshake_manager_node { - grpc_handshake_manager *handshake_mgr; - struct pending_handshake_manager_node *next; -} pending_handshake_manager_node; - typedef struct { grpc_server *server; grpc_tcp_server *tcp_server; @@ -68,7 +63,7 @@ typedef struct { bool shutdown; grpc_closure tcp_server_shutdown_complete; grpc_closure *server_destroy_listener_done; - pending_handshake_manager_node *pending_handshake_mgrs; + grpc_handshake_manager *pending_handshake_mgrs; } server_state; typedef struct { @@ -78,44 +73,6 @@ typedef struct { grpc_handshake_manager *handshake_mgr; } server_connection_state; -static void pending_handshake_manager_add_locked( - server_state *state, grpc_handshake_manager *handshake_mgr) { - pending_handshake_manager_node *node = gpr_malloc(sizeof(*node)); - node->handshake_mgr = handshake_mgr; - node->next = state->pending_handshake_mgrs; - state->pending_handshake_mgrs = node; -} - -static void pending_handshake_manager_remove_locked( - server_state *state, grpc_handshake_manager *handshake_mgr) { - pending_handshake_manager_node **prev_node = &state->pending_handshake_mgrs; - for (pending_handshake_manager_node *node = state->pending_handshake_mgrs; - node != NULL; node = node->next) { - if (node->handshake_mgr == handshake_mgr) { - *prev_node = node->next; - gpr_free(node); - break; - } - prev_node = &node->next; - } -} - -static void pending_handshake_manager_shutdown_locked(grpc_exec_ctx *exec_ctx, - server_state *state, - grpc_error *why) { - pending_handshake_manager_node *prev_node = NULL; - for (pending_handshake_manager_node *node = state->pending_handshake_mgrs; - node != NULL; node = node->next) { - grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr, - GRPC_ERROR_REF(why)); - gpr_free(prev_node); - prev_node = node; - } - gpr_free(prev_node); - state->pending_handshake_mgrs = NULL; - GRPC_ERROR_UNREF(why); -} - static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_handshaker_args *args = arg; @@ -153,8 +110,9 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_channel_args_destroy(exec_ctx, args->args); } } - pending_handshake_manager_remove_locked(connection_state->server_state, - connection_state->handshake_mgr); + grpc_handshake_manager_pending_list_remove( + &connection_state->server_state->pending_handshake_mgrs, + connection_state->handshake_mgr); gpr_mu_unlock(&connection_state->server_state->mu); grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); @@ -174,7 +132,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, return; } grpc_handshake_manager *handshake_mgr = grpc_handshake_manager_create(); - pending_handshake_manager_add_locked(state, handshake_mgr); + grpc_handshake_manager_pending_list_add(&state->pending_handshake_mgrs, + handshake_mgr); gpr_mu_unlock(&state->mu); grpc_tcp_server_ref(state->tcp_server); server_connection_state *connection_state = @@ -213,8 +172,8 @@ static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&state->mu); grpc_closure *destroy_done = state->server_destroy_listener_done; GPR_ASSERT(state->shutdown); - pending_handshake_manager_shutdown_locked(exec_ctx, state, - GRPC_ERROR_REF(error)); + grpc_handshake_manager_pending_list_shutdown_all( + exec_ctx, state->pending_handshake_mgrs, GRPC_ERROR_REF(error)); gpr_mu_unlock(&state->mu); // Flush queued work before destroying handshaker factory, since that // may do a synchronous unref. @@ -263,8 +222,7 @@ grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx, if (err != GRPC_ERROR_NONE) { goto error; } - state = gpr_malloc(sizeof(*state)); - memset(state, 0, sizeof(*state)); + state = gpr_zalloc(sizeof(*state)); grpc_closure_init(&state->tcp_server_shutdown_complete, tcp_server_shutdown_complete, state, grpc_schedule_on_exec_ctx); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 8a9eaa8b6a..082078c72f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -48,16 +48,19 @@ #include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/http2_errors.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/status_conversion.h" #include "src/core/lib/transport/timeout_encoding.h" +#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport_impl.h" #define DEFAULT_WINDOW 65535 @@ -66,6 +69,10 @@ #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024) #define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024) +#define DEFAULT_KEEPALIVE_TIME_SECOND INT_MAX +#define DEFAULT_KEEPALIVE_TIMEOUT_SECOND 20 +#define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false + #define MAX_CLIENT_STREAM_ID 0x7fffffffu int grpc_http_trace = 0; int grpc_flowctl_trace = 0; @@ -139,6 +146,16 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, #define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 3 +/** keepalive-relevant functions */ +static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); + /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -168,7 +185,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_destroy(&t->stream_map); grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker); - grpc_combiner_destroy(exec_ctx, t->combiner); + GRPC_COMBINER_UNREF(exec_ctx, t->combiner, "chttp2_transport"); cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); @@ -218,8 +235,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); - memset(t, 0, sizeof(*t)); - t->base.vtable = &vtable; t->ep = ep; /* one ref is for destroy */ @@ -255,6 +270,17 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_combiner_scheduler(t->combiner, false)); grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, + t, grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->start_keepalive_ping_locked, + start_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->finish_keepalive_ping_locked, + finish_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->keepalive_watchdog_fired_locked, + keepalive_watchdog_fired_locked, t, + grpc_combiner_scheduler(t->combiner, false)); grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string); t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC); @@ -265,7 +291,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, .gain_d = 0, .initial_control_value = log2(DEFAULT_WINDOW), .min_control_value = -1, - .max_control_value = 22, + .max_control_value = 25, .integral_range = 10}); grpc_chttp2_goaway_parser_init(&t->goaway_parser); @@ -316,6 +342,18 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN), }; + /* client-side keepalive setting */ + t->keepalive_time = + DEFAULT_KEEPALIVE_TIME_SECOND == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIME_SECOND, GPR_TIMESPAN); + t->keepalive_timeout = + DEFAULT_KEEPALIVE_TIMEOUT_SECOND == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIMEOUT_SECOND, + GPR_TIMESPAN); + t->keepalive_permit_without_calls = DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; + if (channel_args) { for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, @@ -363,6 +401,28 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { t->enable_bdp_probe = grpc_channel_arg_get_integer( &channel_args->args[i], (grpc_integer_options){1, 0, 1}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_KEEPALIVE_TIME)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){DEFAULT_KEEPALIVE_TIME_SECOND, 1, INT_MAX}); + t->keepalive_time = value == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(value, GPR_TIMESPAN); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_KEEPALIVE_TIMEOUT)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){DEFAULT_KEEPALIVE_TIMEOUT_SECOND, 0, + INT_MAX}); + t->keepalive_timeout = value == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(value, GPR_TIMESPAN); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { + t->keepalive_permit_without_calls = + (uint32_t)grpc_channel_arg_get_integer( + &channel_args->args[i], (grpc_integer_options){0, 0, 1}); } else { static const struct { const char *channel_arg_name; @@ -414,6 +474,16 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; + /** Start client-side keepalive pings */ + if (t->is_client) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init( + exec_ctx, &t->keepalive_ping_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), + &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + } + grpc_chttp2_initiate_write(exec_ctx, t, false, "init"); post_benign_reclaimer(exec_ctx, t); } @@ -441,6 +511,10 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { if (!t->closed) { + if (!grpc_error_has_clear_grpc_status(error)) { + error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE); + } if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) { if (t->close_transport_on_writes_finished == NULL) { t->close_transport_on_writes_finished = @@ -450,14 +524,26 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_error_add_child(t->close_transport_on_writes_finished, error); return; } - if (!grpc_error_has_clear_grpc_status(error)) { - error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAVAILABLE); - } t->closed = 1; connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); + if (t->is_client) { + switch (t->keepalive_state) { + case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: { + grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); + break; + } + case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING: { + grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); + grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); + break; + } + case GRPC_CHTTP2_KEEPALIVE_STATE_DYING: { + break; + } + } + } /* flush writable stream list to avoid dangling references */ grpc_chttp2_stream *s; @@ -489,13 +575,11 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { GPR_TIMER_BEGIN("init_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - memset(s, 0, sizeof(*s)); - s->t = t; s->refcount = refcount; /* We reserve one 'active stream' that's dropped when the stream is @@ -504,8 +588,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_ref_init(&s->active_streams, 1); GRPC_CHTTP2_STREAM_REF(s, "chttp2"); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0]); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1]); + grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); + grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena); grpc_chttp2_data_parser_init(&s->data_parser); grpc_slice_buffer_init(&s->flow_controlled_buffer); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); @@ -569,20 +653,29 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GRPC_ERROR_UNREF(s->read_closed_error); GRPC_ERROR_UNREF(s->write_closed_error); + if (s->incoming_window_delta > 0) { + GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( + "destroy", t, s, s->incoming_window_delta); + } else if (s->incoming_window_delta < 0) { + GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA( + "destroy", t, s, -s->incoming_window_delta); + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream"); GPR_TIMER_END("destroy_stream", 0); - gpr_free(s->destroy_stream_arg); + grpc_closure_sched(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE); } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, void *and_free_memory) { + grpc_stream *gs, + grpc_closure *then_schedule_closure) { GPR_TIMER_BEGIN("destroy_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - s->destroy_stream_arg = and_free_memory; + s->destroy_stream_arg = then_schedule_closure; grpc_closure_sched( exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s, grpc_combiner_scheduler(t->combiner, false)), @@ -1033,8 +1126,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GPR_TIMER_BEGIN("perform_stream_op_locked", 0); grpc_transport_stream_op *op = stream_op; - grpc_chttp2_transport *t = op->transport_private.args[0]; - grpc_chttp2_stream *s = op->transport_private.args[1]; + grpc_chttp2_transport *t = op->handler_private.args[0]; + grpc_chttp2_stream *s = op->handler_private.args[1]; if (grpc_http_trace) { char *str = grpc_transport_stream_op_string(op); @@ -1106,8 +1199,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_chttp2_list_add_waiting_for_concurrency(t, s); maybe_start_some_streams(exec_ctx, t); } else { - grpc_chttp2_cancel_stream(exec_ctx, t, s, - GRPC_ERROR_CREATE("Transport closed")); + grpc_chttp2_cancel_stream( + exec_ctx, t, s, + grpc_error_set_int(GRPC_ERROR_CREATE("Transport closed"), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE)); } } else { GPR_ASSERT(s->id != 0); @@ -1255,13 +1351,13 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_free(str); } - op->transport_private.args[0] = gt; - op->transport_private.args[1] = gs; + op->handler_private.args[0] = gt; + op->handler_private.args[1] = gs; GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); grpc_closure_sched( exec_ctx, grpc_closure_init( - &op->transport_private.closure, perform_stream_op_locked, op, + &op->handler_private.closure, perform_stream_op_locked, op, grpc_combiner_scheduler(t->combiner, op->covered_by_poller)), GRPC_ERROR_NONE); GPR_TIMER_END("perform_stream_op", 0); @@ -1534,15 +1630,19 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, s->recv_trailing_metadata_finished != NULL) { char status_string[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(status, status_string); - grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_STATUS, - grpc_slice_from_copied_string(status_string))); + GRPC_LOG_IF_ERROR("add_status", + grpc_chttp2_incoming_metadata_buffer_replace_or_add( + exec_ctx, &s->metadata_buffer[1], + grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_GRPC_STATUS, + grpc_slice_from_copied_string(status_string)))); if (msg != NULL) { - grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, - grpc_slice_from_copied_string(msg))); + GRPC_LOG_IF_ERROR( + "add_status_message", + grpc_chttp2_incoming_metadata_buffer_replace_or_add( + exec_ctx, &s->metadata_buffer[1], + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + grpc_slice_from_copied_string(msg)))); } s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE; grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); @@ -1801,13 +1901,13 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (delta == 0 || (bdp != 0 && delta > -1024 && delta < 1024)) { return; } + if (grpc_bdp_estimator_trace) { + gpr_log(GPR_DEBUG, "%s: update initial window size to %d", t->peer_string, + (int)bdp); + } push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp); } -/******************************************************************************* - * INPUT PROCESSING - PARSING - */ - static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_http_parser parser; @@ -1976,6 +2076,77 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); } +static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); + if (error == GRPC_ERROR_NONE && !(t->destroying || t->closed)) { + if (t->keepalive_permit_without_calls || t->stream_map.count > 0) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; + GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); + send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, + &t->start_keepalive_ping_locked, + &t->finish_keepalive_ping_locked); + } else { + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init( + exec_ctx, &t->keepalive_ping_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), + &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping"); +} + +static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); + grpc_timer_init( + exec_ctx, &t->keepalive_watchdog_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_timeout), + &t->keepalive_watchdog_fired_locked, gpr_now(GPR_CLOCK_MONOTONIC)); +} + +static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init( + exec_ctx, &t->keepalive_ping_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), + grpc_closure_create(init_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)), + gpr_now(GPR_CLOCK_MONOTONIC)); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end"); +} + +static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; + close_transport_locked(exec_ctx, t, + GRPC_ERROR_CREATE("keepalive watchdog timeout")); + } + } else { + /** The watchdog timer should have been cancelled by + finish_keepalive_ping_locked. */ + if (error != GRPC_ERROR_CANCELLED) { + gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", + t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive watchdog"); +} + /******************************************************************************* * CALLBACK LOOP */ @@ -2054,8 +2225,8 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, (int64_t)have_already) { write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED; } - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window_delta, - add_max_recv_bytes); + GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA("op", t, s, + add_max_recv_bytes); GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window, add_max_recv_bytes); if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size - @@ -2417,7 +2588,7 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), grpc_transport *grpc_create_chttp2_transport( grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, grpc_endpoint *ep, int is_client) { - grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport)); + grpc_chttp2_transport *t = gpr_zalloc(sizeof(grpc_chttp2_transport)); init_transport(exec_ctx, t, channel_args, ep, is_client != 0); return &t->base; } diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index f487533c41..9b4b1a7b84 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -91,7 +91,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_ping_parser *p = parser; while (p->byte != 8 && cur != end) { - p->opaque_8bytes |= (((uint64_t)*cur) << (8 * p->byte)); + p->opaque_8bytes |= (((uint64_t)*cur) << (56 - 8 * p->byte)); cur++; p->byte++; } diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.h b/src/core/ext/transport/chttp2/transport/frame_settings.h index a29dc82106..44137798c0 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.h +++ b/src/core/ext/transport/chttp2/transport/frame_settings.h @@ -87,7 +87,7 @@ extern const grpc_chttp2_setting_parameters grpc_chttp2_settings_parameters[GRPC_CHTTP2_NUM_SETTINGS]; /* Create a settings frame by diffing old & new, and updating old to be new */ -grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, +grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *newval, uint32_t force_mask, size_t count); /* Create an ack settings frame */ grpc_slice grpc_chttp2_settings_ack_create(void); diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c index 63df8e135f..84586cd998 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c @@ -173,6 +173,7 @@ static void add_header_data(framer_state *st, grpc_slice slice) { static uint8_t *add_tiny_header_data(framer_state *st, size_t len) { ensure_space(st, len); + st->stats->header_bytes += len; return grpc_slice_buffer_tiny_add(st->output, len); } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c index c91b019aa0..da0a34d32a 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c @@ -41,69 +41,48 @@ #include <grpc/support/log.h> void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer *buffer) { - buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena) { + buffer->arena = arena; + grpc_metadata_batch_init(&buffer->batch); + buffer->batch.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer) { - size_t i; - if (!buffer->published) { - for (i = 0; i < buffer->count; i++) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - } - } - gpr_free(buffer->elems); + grpc_metadata_batch_destroy(exec_ctx, &buffer->batch); } -void grpc_chttp2_incoming_metadata_buffer_add( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) { - GPR_ASSERT(!buffer->published); - if (buffer->capacity == buffer->count) { - buffer->capacity = GPR_MAX(8, 2 * buffer->capacity); - buffer->elems = - gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity); - } - buffer->elems[buffer->count++].md = elem; +grpc_error *grpc_chttp2_incoming_metadata_buffer_add( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, + grpc_mdelem elem) { buffer->size += GRPC_MDELEM_LENGTH(elem); + return grpc_metadata_batch_add_tail( + exec_ctx, &buffer->batch, + gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem); } -void grpc_chttp2_incoming_metadata_buffer_replace_or_add( +grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) { - for (size_t i = 0; i < buffer->count; i++) { - if (grpc_slice_eq(GRPC_MDKEY(buffer->elems[i].md), GRPC_MDKEY(elem))) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - buffer->elems[i].md = elem; - return; + for (grpc_linked_mdelem *l = buffer->batch.list.head; l != NULL; + l = l->next) { + if (grpc_slice_eq(GRPC_MDKEY(l->md), GRPC_MDKEY(elem))) { + GRPC_MDELEM_UNREF(exec_ctx, l->md); + l->md = elem; + return GRPC_ERROR_NONE; } } - grpc_chttp2_incoming_metadata_buffer_add(buffer, elem); + return grpc_chttp2_incoming_metadata_buffer_add(exec_ctx, buffer, elem); } void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) { - GPR_ASSERT(!buffer->published); - buffer->deadline = deadline; + buffer->batch.deadline = deadline; } void grpc_chttp2_incoming_metadata_buffer_publish( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch) { - GPR_ASSERT(!buffer->published); - buffer->published = 1; - if (buffer->count > 0) { - size_t i; - for (i = 0; i < buffer->count; i++) { - /* TODO(ctiller): do something better here */ - if (!GRPC_LOG_IF_ERROR("grpc_chttp2_incoming_metadata_buffer_publish", - grpc_metadata_batch_link_tail( - exec_ctx, batch, &buffer->elems[i]))) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - } - } - } else { - batch->list.head = batch->list.tail = NULL; - } - batch->deadline = buffer->deadline; + *batch = buffer->batch; + grpc_metadata_batch_init(&buffer->batch); } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h index 1eac6fc150..288c917e65 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h @@ -37,28 +37,26 @@ #include "src/core/lib/transport/transport.h" typedef struct { - grpc_linked_mdelem *elems; - size_t count; - size_t capacity; - gpr_timespec deadline; - int published; + gpr_arena *arena; + grpc_metadata_batch batch; size_t size; // total size of metadata } grpc_chttp2_incoming_metadata_buffer; /** assumes everything initially zeroed */ void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer *buffer); + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena); void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer); void grpc_chttp2_incoming_metadata_buffer_publish( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch); -void grpc_chttp2_incoming_metadata_buffer_add( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem); -void grpc_chttp2_incoming_metadata_buffer_replace_or_add( +grpc_error *grpc_chttp2_incoming_metadata_buffer_add( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, - grpc_mdelem elem); + grpc_mdelem elem) GRPC_MUST_USE_RESULT; +grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, + grpc_mdelem elem) GRPC_MUST_USE_RESULT; void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 075d421dd4..3c56c21599 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -50,6 +50,7 @@ #include "src/core/ext/transport/chttp2/transport/stream_map.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/pid_controller.h" @@ -208,6 +209,12 @@ struct grpc_chttp2_incoming_byte_stream { grpc_closure finished_action; }; +typedef enum { + GRPC_CHTTP2_KEEPALIVE_STATE_WAITING, + GRPC_CHTTP2_KEEPALIVE_STATE_PINGING, + GRPC_CHTTP2_KEEPALIVE_STATE_DYING, +} grpc_chttp2_keepalive_state; + struct grpc_chttp2_transport { grpc_transport base; /* must be first */ gpr_refcount refs; @@ -271,10 +278,6 @@ struct grpc_chttp2_transport { /** data to write next write */ grpc_slice_buffer qbuf; - /** window available to announce to peer */ - int64_t announce_incoming_window; - /** how much window would we like to have for incoming_window */ - uint32_t connection_window_target; /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set? */ uint32_t write_buffer_size; @@ -328,6 +331,16 @@ struct grpc_chttp2_transport { /** window available for peer to send to us */ int64_t incoming_window; + /** calculating what we should give for incoming window: + we track the total amount of flow control over initial window size + across all streams: this is data that we want to receive right now (it + has an outstanding read) + and the total amount of flow control under initial window size across all + streams: this is data we've read early + we want to adjust incoming_window such that: + incoming_window = total_over - max(bdp - total_under, 0) */ + int64_t stream_total_over_incoming_window; + int64_t stream_total_under_incoming_window; /* deframing */ grpc_chttp2_deframe_transport_state deframe_state; @@ -376,6 +389,28 @@ struct grpc_chttp2_transport { grpc_closure benign_reclaimer_locked; /** destructive cleanup closure */ grpc_closure destructive_reclaimer_locked; + + /* keep-alive ping support */ + /** Closure to initialize a keepalive ping */ + grpc_closure init_keepalive_ping_locked; + /** Closure to run when the keepalive ping is sent */ + grpc_closure start_keepalive_ping_locked; + /** Cousure to run when the keepalive ping ack is received */ + grpc_closure finish_keepalive_ping_locked; + /** Closrue to run when the keepalive ping timeouts */ + grpc_closure keepalive_watchdog_fired_locked; + /** timer to initiate ping events */ + grpc_timer keepalive_ping_timer; + /** watchdog to kill the transport when waiting for the keepalive ping */ + grpc_timer keepalive_watchdog_timer; + /** time duration in between pings */ + gpr_timespec keepalive_time; + /** grace period for a ping to complete before watchdog kicks in */ + gpr_timespec keepalive_timeout; + /** if keepalive pings are allowed when there's no outstanding streams */ + bool keepalive_permit_without_calls; + /** keep-alive state machine state */ + grpc_chttp2_keepalive_state keepalive_state; }; typedef enum { @@ -390,7 +425,7 @@ struct grpc_chttp2_stream { grpc_stream_refcount *refcount; grpc_closure destroy_stream; - void *destroy_stream_arg; + grpc_closure *destroy_stream_arg; grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; uint8_t included[STREAM_LIST_COUNT]; @@ -634,6 +669,44 @@ typedef enum { GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \ amount) +#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE( \ + phase, transport, dst_context) \ + if (dst_context->incoming_window_delta < 0) { \ + transport->stream_total_under_incoming_window += \ + dst_context->incoming_window_delta; \ + } else if (dst_context->incoming_window_delta > 0) { \ + transport->stream_total_over_incoming_window -= \ + dst_context->incoming_window_delta; \ + } + +#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE( \ + phase, transport, dst_context) \ + if (dst_context->incoming_window_delta < 0) { \ + transport->stream_total_under_incoming_window -= \ + dst_context->incoming_window_delta; \ + } else if (dst_context->incoming_window_delta > 0) { \ + transport->stream_total_over_incoming_window += \ + dst_context->incoming_window_delta; \ + } + +#define GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( \ + phase, transport, dst_context, amount) \ + GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \ + dst_context); \ + GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, \ + incoming_window_delta, amount); \ + GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \ + dst_context); + +#define GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA( \ + phase, transport, dst_context, amount) \ + GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \ + dst_context); \ + GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, \ + incoming_window_delta, amount); \ + GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \ + dst_context); + #define GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, id, dst_context, \ dst_var, amount) \ do { \ @@ -752,4 +825,6 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error); +uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t); + #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 24bd93067b..7efc8c63c9 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -376,34 +376,47 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, return err; } - uint32_t target_incoming_window = GPR_MAX( - t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - 1024); - GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window, - incoming_frame_size); - if (t->incoming_window <= target_incoming_window / 2) { - grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control"); - } - if (s != NULL) { if (incoming_frame_size > s->incoming_window_delta + t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) { - char *msg; - gpr_asprintf(&msg, - "frame of size %d overflows incoming window of %" PRId64, - t->incoming_frame_size, - s->incoming_window_delta + - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); - grpc_error *err = GRPC_ERROR_CREATE(msg); - gpr_free(msg); - return err; + if (incoming_frame_size <= + s->incoming_window_delta + + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) { + gpr_log( + GPR_ERROR, + "Incoming frame of size %d exceeds incoming window size of %" PRId64 + ".\n" + "The (un-acked, future) window size would be %" PRId64 + " which is not exceeded.\n" + "This would usually cause a disconnection, but allowing it due to " + "broken HTTP2 implementations in the wild.\n" + "See (for example) https://github.com/netty/netty/issues/6520.", + t->incoming_frame_size, + s->incoming_window_delta + + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + s->incoming_window_delta + + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + } else { + char *msg; + gpr_asprintf(&msg, + "frame of size %d overflows incoming window of %" PRId64, + t->incoming_frame_size, + s->incoming_window_delta + + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + grpc_error *err = GRPC_ERROR_CREATE(msg); + gpr_free(msg); + return err; + } } - GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window_delta, - incoming_frame_size); + GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s, + incoming_frame_size); if ((int64_t)t->settings[GRPC_SENT_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] + (int64_t)s->incoming_window_delta - (int64_t)s->announce_window <= @@ -417,6 +430,13 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, s->received_bytes += incoming_frame_size; } + uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t); + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window, + incoming_frame_size); + if (t->incoming_window <= target_incoming_window / 2) { + grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control"); + } + return GRPC_ERROR_NONE; } @@ -528,7 +548,14 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, s->seen_error = true; GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md); + grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[0], md); + if (error != GRPC_ERROR_NONE) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); + s->seen_error = true; + GRPC_MDELEM_UNREF(exec_ctx, md); + } } } @@ -578,7 +605,14 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp, s->seen_error = true; GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md); + grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[1], md); + if (error != GRPC_ERROR_NONE) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); + s->seen_error = true; + GRPC_MDELEM_UNREF(exec_ctx, md); + } } GPR_TIMER_END("on_trailing_header", 0); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 05e6f59947..2b9d93cae7 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -152,6 +152,17 @@ static bool stream_ref_if_not_destroyed(gpr_refcount *r) { return true; } +uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t) { + return (uint32_t)GPR_MAX( + (int64_t)((1u << 31) - 1), + t->stream_total_over_incoming_window + + (int64_t)GPR_MAX( + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - + t->stream_total_under_incoming_window, + 0)); +} + bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_chttp2_stream *s; @@ -310,13 +321,12 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, /* if the grpc_chttp2_transport is ready to send a window update, do so here also; 3/4 is a magic number that will likely get tuned soon */ - uint32_t target_incoming_window = GPR_MAX( - t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - 1024); + uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t); uint32_t threshold_to_send_transport_window_update = t->outbuf.count > 0 ? 3 * target_incoming_window / 4 : target_incoming_window / 2; - if (t->incoming_window <= threshold_to_send_transport_window_update) { + if (t->incoming_window <= threshold_to_send_transport_window_update && + t->incoming_window != target_incoming_window) { maybe_initiate_ping(exec_ctx, t, GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE); uint32_t announced = (uint32_t)GPR_CLAMP( diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c index 477cf07f45..b6e9e845df 100644 --- a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c +++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c @@ -39,6 +39,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/ext/transport/cronet/transport/cronet_transport.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/transport_impl.h" @@ -54,16 +55,14 @@ extern grpc_transport_vtable grpc_cronet_vtable; GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( void *engine, const char *target, const grpc_channel_args *args, void *reserved) { - cronet_transport *ct = gpr_malloc(sizeof(cronet_transport)); - ct->base.vtable = &grpc_cronet_vtable; - ct->engine = engine; - ct->host = gpr_malloc(strlen(target) + 1); - strcpy(ct->host, target); gpr_log(GPR_DEBUG, "grpc_create_cronet_transport: stream_engine = %p, target=%s", engine, - ct->host); + target); + + grpc_transport *ct = + grpc_create_cronet_transport(engine, target, args, reserved); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; return grpc_channel_create(&exec_ctx, target, args, - GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct); + GRPC_CLIENT_DIRECT_CHANNEL, ct); } diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c index da6c0b4fbc..0dc6a5152f 100644 --- a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c +++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c @@ -80,4 +80,16 @@ void bidirectional_stream_cancel(bidirectional_stream* stream) { GPR_ASSERT(0); } +void bidirectional_stream_disable_auto_flush(bidirectional_stream* stream, + bool disable_auto_flush) { + GPR_ASSERT(0); +} + +void bidirectional_stream_delay_request_headers_until_flush( + bidirectional_stream* stream, bool delay_headers_until_flush) { + GPR_ASSERT(0); +} + +void bidirectional_stream_flush(bidirectional_stream* stream) { GPR_ASSERT(0); } + #endif /* GRPC_COMPILE_WITH_CRONET */ diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index d755b1f147..450d9ab23a 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -54,6 +54,7 @@ #include "third_party/objective_c/Cronet/bidirectional_stream_c.h" #define GRPC_HEADER_SIZE_IN_BYTES 5 +#define GRPC_FLUSH_READ_SIZE 4096 #define CRONET_LOG(...) \ do { \ @@ -88,7 +89,7 @@ enum e_op_id { /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */ -static void on_request_headers_sent(bidirectional_stream *); +static void on_stream_ready(bidirectional_stream *); static void on_response_headers_received( bidirectional_stream *, const bidirectional_stream_header_array *, const char *); @@ -100,7 +101,7 @@ static void on_succeeded(bidirectional_stream *); static void on_failed(bidirectional_stream *, int); static void on_canceled(bidirectional_stream *); static bidirectional_stream_callback cronet_callbacks = { - on_request_headers_sent, + on_stream_ready, on_response_headers_received, on_read_completed, on_write_completed, @@ -114,6 +115,7 @@ struct grpc_cronet_transport { grpc_transport base; /* must be first element in this structure */ stream_engine *engine; char *host; + bool use_packet_coalescing; }; typedef struct grpc_cronet_transport grpc_cronet_transport; @@ -150,8 +152,17 @@ struct write_state { struct op_state { bool state_op_done[OP_NUM_OPS]; bool state_callback_received[OP_NUM_OPS]; + /* A non-zero gRPC status code has been seen */ bool fail_state; + /* Transport is discarding all buffered messages */ bool flush_read; + bool flush_cronet_when_ready; + bool pending_write_for_trailer; + bool pending_send_message; + /* User requested RECV_TRAILING_METADATA */ + bool pending_recv_trailing_metadata; + /* Cronet has not issued a callback of a bidirectional read */ + bool pending_read_from_cronet; grpc_error *cancel_error; /* data structure for storing data coming from server */ struct read_state rs; @@ -173,9 +184,10 @@ struct op_storage { }; struct stream_obj { + gpr_arena *arena; struct op_and_state *oas; grpc_transport_stream_op *curr_op; - grpc_cronet_transport curr_ct; + grpc_cronet_transport *curr_ct; grpc_stream *curr_gs; bidirectional_stream *cbs; bidirectional_stream_header_array header_array; @@ -244,11 +256,35 @@ static const char *op_id_string(enum e_op_id i) { return "UNKNOWN"; } -static void free_read_buffer(stream_obj *s) { +static void null_and_maybe_free_read_buffer(stream_obj *s) { if (s->state.rs.read_buffer && s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) { gpr_free(s->state.rs.read_buffer); - s->state.rs.read_buffer = NULL; + } + s->state.rs.read_buffer = NULL; +} + +static void maybe_flush_read(stream_obj *s) { + /* To enter flush read state (discarding all the buffered messages in + * transport layer), two conditions must be satisfied: 1) non-zero grpc status + * has been received, and 2) an op requesting the status code + * (RECV_TRAILING_METADATA) is issued by the user. (See + * doc/status_ordering.md) */ + /* Whenever the evaluation of any of the two condition is changed, we check + * whether we should enter the flush read state. */ + if (s->state.pending_recv_trailing_metadata && s->state.fail_state) { + if (!s->state.flush_read && !s->state.rs.read_stream_closed) { + CRONET_LOG(GPR_DEBUG, "%p: Flush read", s); + s->state.flush_read = true; + null_and_maybe_free_read_buffer(s); + s->state.rs.read_buffer = gpr_malloc(GRPC_FLUSH_READ_SIZE); + if (!s->state.pending_read_from_cronet) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); + bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, + GRPC_FLUSH_READ_SIZE); + s->state.pending_read_from_cronet = true; + } + } } } @@ -274,6 +310,13 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { new_op->next = storage->head; storage->head = new_op; storage->num_pending_ops++; + if (op->send_message) { + s->state.pending_send_message = true; + } + if (op->recv_trailing_metadata) { + s->state.pending_recv_trailing_metadata = true; + maybe_flush_read(s); + } CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op, storage->num_pending_ops); gpr_mu_unlock(&s->mu); @@ -360,7 +403,7 @@ static void on_failed(bidirectional_stream *stream, int net_error) { gpr_free(s->state.ws.write_buffer); s->state.ws.write_buffer = NULL; } - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -383,7 +426,7 @@ static void on_canceled(bidirectional_stream *stream) { gpr_free(s->state.ws.write_buffer); s->state.ws.write_buffer = NULL; } - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -398,7 +441,7 @@ static void on_succeeded(bidirectional_stream *stream) { bidirectional_stream_destroy(s->cbs); s->state.state_callback_received[OP_SUCCEEDED] = true; s->cbs = NULL; - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -406,9 +449,10 @@ static void on_succeeded(bidirectional_stream *stream) { /* Cronet callback */ -static void on_request_headers_sent(bidirectional_stream *stream) { - CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream); +static void on_stream_ready(bidirectional_stream *stream) { + CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream); stream_obj *s = (stream_obj *)stream->annotation; + grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; gpr_mu_lock(&s->mu); s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true; s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true; @@ -417,6 +461,14 @@ static void on_request_headers_sent(bidirectional_stream *stream) { gpr_free(s->header_array.headers); s->header_array.headers = NULL; } + /* Send the initial metadata on wire if there is no SEND_MESSAGE or + * SEND_TRAILING_METADATA ops pending */ + if (t->use_packet_coalescing) { + if (s->state.flush_cronet_when_ready) { + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(stream); + } + } gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -435,15 +487,18 @@ static void on_response_headers_received( gpr_mu_lock(&s->mu); memset(&s->state.rs.initial_metadata, 0, sizeof(s->state.rs.initial_metadata)); - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata); + grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata, + s->arena); for (size_t i = 0; i < headers->count; i++) { - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.initial_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - headers->headers[i].key)), - grpc_slice_intern( - grpc_slice_from_static_string(headers->headers[i].value)))); + GRPC_LOG_IF_ERROR( + "on_response_headers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &exec_ctx, &s->state.rs.initial_metadata, + grpc_mdelem_from_slices( + &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].value))))); } s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; if (!(s->state.state_op_done[OP_CANCEL_ERROR] || @@ -457,6 +512,7 @@ static void on_response_headers_received( CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, s->state.rs.remaining_bytes); + s->state.pending_read_from_cronet = true; } gpr_mu_unlock(&s->mu); grpc_exec_ctx_finish(&exec_ctx); @@ -488,10 +544,13 @@ static void on_read_completed(bidirectional_stream *stream, char *data, CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count); gpr_mu_lock(&s->mu); + s->state.pending_read_from_cronet = false; s->state.state_callback_received[OP_RECV_MESSAGE] = true; if (count > 0 && s->state.flush_read) { CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); - bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096); + bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, + GRPC_FLUSH_READ_SIZE); + s->state.pending_read_from_cronet = true; gpr_mu_unlock(&s->mu); } else if (count > 0) { s->state.rs.received_bytes += count; @@ -502,16 +561,14 @@ static void on_read_completed(bidirectional_stream *stream, char *data, bidirectional_stream_read( s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes, s->state.rs.remaining_bytes); + s->state.pending_read_from_cronet = true; gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); execute_from_storage(s); } } else { - if (s->state.flush_read) { - gpr_free(s->state.rs.read_buffer); - s->state.rs.read_buffer = NULL; - } + null_and_maybe_free_read_buffer(s); s->state.rs.read_stream_closed = true; gpr_mu_unlock(&s->mu); execute_from_storage(s); @@ -528,25 +585,30 @@ static void on_response_trailers_received( CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, trailers); stream_obj *s = (stream_obj *)stream->annotation; + grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; gpr_mu_lock(&s->mu); memset(&s->state.rs.trailing_metadata, 0, sizeof(s->state.rs.trailing_metadata)); s->state.rs.trailing_metadata_valid = false; - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata); + grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata, + s->arena); for (size_t i = 0; i < trailers->count; i++) { CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, trailers->headers[i].value); - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.trailing_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - trailers->headers[i].key)), - grpc_slice_intern( - grpc_slice_from_static_string(trailers->headers[i].value)))); + GRPC_LOG_IF_ERROR( + "on_response_trailers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &exec_ctx, &s->state.rs.trailing_metadata, + grpc_mdelem_from_slices( + &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].value))))); s->state.rs.trailing_metadata_valid = true; if (0 == strcmp(trailers->headers[i].key, "grpc-status") && 0 != strcmp(trailers->headers[i].value, "0")) { s->state.fail_state = true; + maybe_flush_read(s); } } s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true; @@ -558,6 +620,10 @@ static void on_response_trailers_received( CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs); s->state.state_callback_received[OP_SEND_MESSAGE] = false; bidirectional_stream_write(s->cbs, "", 0, true); + if (t->use_packet_coalescing) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(s->cbs); + } s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; gpr_mu_unlock(&s->mu); @@ -607,7 +673,7 @@ static void convert_metadata_to_cronet_headers( curr = curr->next; num_headers_available++; } - /* Allocate enough memory. It is freed in the on_request_headers_sent callback + /* Allocate enough memory. It is freed in the on_stream_ready callback */ bidirectional_stream_header *headers = (bidirectional_stream_header *)gpr_malloc( @@ -687,8 +753,10 @@ static bool header_has_authority(grpc_linked_mdelem *head) { executed. This is the heart of the state machine. */ static bool op_can_be_run(grpc_transport_stream_op *curr_op, - struct op_state *stream_state, - struct op_state *op_state, enum e_op_id op_id) { + struct stream_obj *s, struct op_state *op_state, + enum e_op_id op_id) { + struct op_state *stream_state = &s->state; + grpc_cronet_transport *t = s->curr_ct; bool result = true; /* When call is canceled, every op can be run, except under following conditions @@ -755,12 +823,14 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false; /* we haven't sent message yet */ - else if (curr_op->send_message && + else if (stream_state->pending_send_message && !stream_state->state_op_done[OP_SEND_MESSAGE]) result = false; /* we haven't got on_write_completed for the send yet */ else if (stream_state->state_op_done[OP_SEND_MESSAGE] && - !stream_state->state_callback_received[OP_SEND_MESSAGE]) + !stream_state->state_callback_received[OP_SEND_MESSAGE] && + !(t->use_packet_coalescing && + stream_state->pending_write_for_trailer)) result = false; } else if (op_id == OP_CANCEL_ERROR) { /* already executed */ @@ -833,24 +903,28 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, struct op_and_state *oas) { grpc_transport_stream_op *stream_op = &oas->op; struct stream_obj *s = oas->s; + grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; struct op_state *stream_state = &s->state; enum e_op_result result = NO_ACTION_POSSIBLE; if (stream_op->send_initial_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_SEND_INITIAL_METADATA)) { + op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas); /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled, * on_failed */ GPR_ASSERT(s->cbs == NULL); GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]); - s->cbs = bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, - &cronet_callbacks); + s->cbs = + bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks); CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs); + if (t->use_packet_coalescing) { + bidirectional_stream_disable_auto_flush(s->cbs, true); + bidirectional_stream_delay_request_headers_until_flush(s->cbs, true); + } char *url = NULL; const char *method = "POST"; s->header_array.headers = NULL; convert_metadata_to_cronet_headers( - stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url, + stream_op->send_initial_metadata->list.head, t->host, &url, &s->header_array.headers, &s->header_array.count, &method); s->header_array.capacity = s->header_array.count; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url); @@ -862,30 +936,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, gpr_free((void *)s->header_array.headers[header_index].value); } stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; - result = ACTION_TAKEN_WITH_CALLBACK; - } else if (stream_op->recv_initial_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_RECV_INITIAL_METADATA)) { - CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); - if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); - } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); - } else { - grpc_chttp2_incoming_metadata_buffer_publish( - exec_ctx, &oas->s->state.rs.initial_metadata, - stream_op->recv_initial_metadata); - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); + if (t->use_packet_coalescing) { + if (!stream_op->send_message && !stream_op->send_trailing_metadata) { + s->state.flush_cronet_when_ready = true; + } } - stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; - result = ACTION_TAKEN_NO_CALLBACK; + result = ACTION_TAKEN_WITH_CALLBACK; } else if (stream_op->send_message && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_SEND_MESSAGE)) { + op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas); + stream_state->pending_send_message = false; if (stream_state->state_callback_received[OP_FAILED]) { result = NO_ACTION_POSSIBLE; CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); @@ -916,16 +976,63 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_callback_received[OP_SEND_MESSAGE] = false; bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, (int)write_buffer_size, false); - result = ACTION_TAKEN_WITH_CALLBACK; + if (t->use_packet_coalescing) { + if (!stream_op->send_trailing_metadata) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(s->cbs); + result = ACTION_TAKEN_WITH_CALLBACK; + } else { + stream_state->pending_write_for_trailer = true; + result = ACTION_TAKEN_NO_CALLBACK; + } + } else { + result = ACTION_TAKEN_WITH_CALLBACK; + } } else { result = NO_ACTION_POSSIBLE; } } stream_state->state_op_done[OP_SEND_MESSAGE] = true; oas->state.state_op_done[OP_SEND_MESSAGE] = true; + } else if (stream_op->send_trailing_metadata && + op_can_be_run(stream_op, s, &oas->state, + OP_SEND_TRAILING_METADATA)) { + CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); + if (stream_state->state_callback_received[OP_FAILED]) { + result = NO_ACTION_POSSIBLE; + CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); + } else { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs); + stream_state->state_callback_received[OP_SEND_MESSAGE] = false; + bidirectional_stream_write(s->cbs, "", 0, true); + if (t->use_packet_coalescing) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(s->cbs); + } + result = ACTION_TAKEN_WITH_CALLBACK; + } + stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; + } else if (stream_op->recv_initial_metadata && + op_can_be_run(stream_op, s, &oas->state, + OP_RECV_INITIAL_METADATA)) { + CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); + if (stream_state->state_op_done[OP_CANCEL_ERROR]) { + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_NONE); + } else if (stream_state->state_callback_received[OP_FAILED]) { + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_NONE); + } else { + grpc_chttp2_incoming_metadata_buffer_publish( + exec_ctx, &oas->s->state.rs.initial_metadata, + stream_op->recv_initial_metadata); + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_NONE); + } + stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; + result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_op->recv_message && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_RECV_MESSAGE)) { + op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); @@ -947,6 +1054,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; + } else if (stream_state->flush_read) { + CRONET_LOG(GPR_DEBUG, "flush read"); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_NONE); + stream_state->state_op_done[OP_RECV_MESSAGE] = true; + oas->state.state_op_done[OP_RECV_MESSAGE] = true; + result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.length_field_received == false) { if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES && stream_state->rs.remaining_bytes == 0) { @@ -967,6 +1081,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, true; /* Indicates that at least one read request has been made */ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_WITH_CALLBACK; } else { stream_state->rs.remaining_bytes = 0; @@ -980,6 +1095,18 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; + + /* Extra read to trigger on_succeed */ + stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; + stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; + stream_state->rs.received_bytes = 0; + stream_state->rs.length_field_received = false; + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); + stream_state->state_op_done[OP_READ_REQ_MADE] = + true; /* Indicates that at least one read request has been made */ + bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, + stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_NO_CALLBACK; } } else if (stream_state->rs.remaining_bytes == 0) { @@ -992,6 +1119,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, true; /* Indicates that at least one read request has been made */ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_WITH_CALLBACK; } else { result = NO_ACTION_POSSIBLE; @@ -1003,7 +1131,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, uint8_t *dst_p = GRPC_SLICE_START_PTR(read_data_slice); memcpy(dst_p, stream_state->rs.read_buffer, (size_t)stream_state->rs.length_field); - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer, read_data_slice); @@ -1024,10 +1152,11 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_NO_CALLBACK; } } else if (stream_op->recv_trailing_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, + op_can_be_run(stream_op, s, &oas->state, OP_RECV_TRAILING_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas); if (oas->s->state.rs.trailing_metadata_valid) { @@ -1038,23 +1167,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; - } else if (stream_op->send_trailing_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_SEND_TRAILING_METADATA)) { - CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); - if (stream_state->state_callback_received[OP_FAILED]) { - result = NO_ACTION_POSSIBLE; - CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); - } else { - CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs); - stream_state->state_callback_received[OP_SEND_MESSAGE] = false; - bidirectional_stream_write(s->cbs, "", 0, true); - result = ACTION_TAKEN_WITH_CALLBACK; - } - stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; } else if (stream_op->cancel_error && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_CANCEL_ERROR)) { + op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas); CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); if (s->cbs) { @@ -1068,8 +1182,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error); } } else if (stream_op->on_complete && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_ON_COMPLETE)) { + op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { grpc_closure_sched(exec_ctx, stream_op->on_complete, @@ -1097,15 +1210,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, make a note */ if (stream_op->recv_message) stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true; - } else if (stream_state->fail_state && !stream_state->flush_read) { - CRONET_LOG(GPR_DEBUG, "running: %p flush read", oas); - if (stream_state->rs.read_buffer && - stream_state->rs.read_buffer != stream_state->rs.grpc_header_bytes) { - gpr_free(stream_state->rs.read_buffer); - stream_state->rs.read_buffer = NULL; - } - stream_state->rs.read_buffer = gpr_malloc(4096); - stream_state->flush_read = true; } else { result = NO_ACTION_POSSIBLE; } @@ -1118,7 +1222,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { stream_obj *s = (stream_obj *)gs; memset(&s->storage, 0, sizeof(s->storage)); s->storage.head = NULL; @@ -1133,6 +1237,15 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, sizeof(s->state.state_callback_received)); s->state.fail_state = s->state.flush_read = false; s->state.cancel_error = NULL; + s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false; + s->state.pending_send_message = false; + s->state.pending_recv_trailing_metadata = false; + s->state.pending_read_from_cronet = false; + + s->curr_gs = gs; + s->curr_ct = (grpc_cronet_transport *)gt; + s->arena = arena; + gpr_mu_init(&s->mu); return 0; } @@ -1147,40 +1260,33 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx, static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op *op) { CRONET_LOG(GPR_DEBUG, "perform_stream_op"); - stream_obj *s = (stream_obj *)gs; - s->curr_gs = gs; - memcpy(&s->curr_ct, gt, sizeof(grpc_cronet_transport)); - add_to_storage(s, op); if (op->send_initial_metadata && header_has_authority(op->send_initial_metadata->list.head)) { /* Cronet does not support :authority header field. We cancel the call when - this field is present in metadata */ - bidirectional_stream_header_array header_array; - bidirectional_stream_header *header; - bidirectional_stream cbs; - CRONET_LOG(GPR_DEBUG, - ":authority header is provided but not supported;" - " cancel operations"); - /* Notify application that operation is cancelled by forging trailers */ - header_array.count = 1; - header_array.capacity = 1; - header_array.headers = gpr_malloc(sizeof(bidirectional_stream_header)); - header = (bidirectional_stream_header *)header_array.headers; - header->key = "grpc-status"; - header->value = "1"; /* Return status GRPC_STATUS_CANCELLED */ - cbs.annotation = (void *)s; - s->state.state_op_done[OP_CANCEL_ERROR] = true; - on_response_trailers_received(&cbs, &header_array); - gpr_free(header_array.headers); - } else { - execute_from_storage(s); + this field is present in metadata */ + if (op->recv_initial_metadata_ready) { + grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready, + GRPC_ERROR_CANCELLED); + } + if (op->recv_message_ready) { + grpc_closure_sched(exec_ctx, op->recv_message_ready, + GRPC_ERROR_CANCELLED); + } + grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED); + return; } + stream_obj *s = (stream_obj *)gs; + add_to_storage(s, op); + execute_from_storage(s); } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, void *and_free_memory) { + grpc_stream *gs, + grpc_closure *then_schedule_closure) { stream_obj *s = (stream_obj *)gs; + null_and_maybe_free_read_buffer(s); GRPC_ERROR_UNREF(s->state.cancel_error); + grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {} @@ -1197,14 +1303,58 @@ static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_transport_op *op) {} -const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj), - "cronet_http", - init_stream, - set_pollset_do_nothing, - set_pollset_set_do_nothing, - perform_stream_op, - perform_op, - destroy_stream, - destroy_transport, - get_peer, - get_endpoint}; +static const grpc_transport_vtable grpc_cronet_vtable = { + sizeof(stream_obj), + "cronet_http", + init_stream, + set_pollset_do_nothing, + set_pollset_set_do_nothing, + perform_stream_op, + perform_op, + destroy_stream, + destroy_transport, + get_peer, + get_endpoint}; + +grpc_transport *grpc_create_cronet_transport(void *engine, const char *target, + const grpc_channel_args *args, + void *reserved) { + grpc_cronet_transport *ct = gpr_malloc(sizeof(grpc_cronet_transport)); + if (!ct) { + goto error; + } + ct->base.vtable = &grpc_cronet_vtable; + ct->engine = engine; + ct->host = gpr_malloc(strlen(target) + 1); + if (!ct->host) { + goto error; + } + strcpy(ct->host, target); + + ct->use_packet_coalescing = true; + if (args) { + for (size_t i = 0; i < args->num_args; i++) { + if (0 == + strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) { + if (args->args[i].type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s ignored: it must be an integer", + GRPC_ARG_USE_CRONET_PACKET_COALESCING); + } else { + ct->use_packet_coalescing = (args->args[i].value.integer != 0); + } + } + } + } + + return &ct->base; + +error: + if (ct) { + if (ct->host) { + gpr_free(ct->host); + } + gpr_free(ct); + } + + return NULL; +} diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.h b/src/core/ext/transport/cronet/transport/cronet_transport.h new file mode 100644 index 0000000000..169ce31fd7 --- /dev/null +++ b/src/core/ext/transport/cronet/transport/cronet_transport.h @@ -0,0 +1,43 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_TRANSPORT_CRONET_TRANSPORT_CRONET_TRANSPORT_H +#define GRPC_CORE_EXT_TRANSPORT_CRONET_TRANSPORT_CRONET_TRANSPORT_H + +#include "src/core/lib/transport/transport.h" + +grpc_transport *grpc_create_cronet_transport(void *engine, const char *target, + const grpc_channel_args *args, + void *reserved); + +#endif /* GRPC_CORE_EXT_TRANSPORT_CRONET_TRANSPORT_CRONET_TRANSPORT_H */ |